reth_exex/
manager.rs

1use crate::{
2    wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_chainspec::Head;
11use reth_evm::execute::BlockExecutorProvider;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives::{EthPrimitives, SealedHeader};
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18    collections::VecDeque,
19    fmt::Debug,
20    future::{poll_fn, Future},
21    ops::Not,
22    pin::Pin,
23    sync::{
24        atomic::{AtomicUsize, Ordering},
25        Arc,
26    },
27    task::{ready, Context, Poll},
28};
29use tokio::sync::{
30    mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31    watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35/// Default max size of the internal state notifications buffer.
36///
37/// 1024 notifications in the buffer is 3.5 hours of mainnet blocks,
38/// or 17 minutes of 1-second blocks.
39pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41/// The maximum number of blocks allowed in the WAL before emitting a warning.
42///
43/// This constant defines the threshold for the Write-Ahead Log (WAL) size. If the number of blocks
44/// in the WAL exceeds this limit, a warning is logged to indicate potential issues.
45pub const WAL_BLOCKS_WARNING: usize = 128;
46
47/// The source of the notification.
48///
49/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
50/// because they are already finalized.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum ExExNotificationSource {
53    /// The notification was sent from the pipeline.
54    Pipeline,
55    /// The notification was sent from the blockchain tree.
56    BlockchainTree,
57}
58
59/// Metrics for an `ExEx`.
60#[derive(Metrics)]
61#[metrics(scope = "exex")]
62struct ExExMetrics {
63    /// The total number of notifications sent to an `ExEx`.
64    notifications_sent_total: Counter,
65    /// The total number of events an `ExEx` has sent to the manager.
66    events_sent_total: Counter,
67}
68
69/// A handle to an `ExEx` used by the [`ExExManager`] to communicate with `ExEx`'s.
70///
71/// A handle should be created for each `ExEx` with a unique ID. The channels returned by
72/// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to
73/// the manager in [`ExExManager::new`].
74#[derive(Debug)]
75pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
76    /// The execution extension's ID.
77    id: String,
78    /// Metrics for an `ExEx`.
79    metrics: ExExMetrics,
80    /// Channel to send [`ExExNotification`]s to the `ExEx`.
81    sender: PollSender<ExExNotification<N>>,
82    /// Channel to receive [`ExExEvent`]s from the `ExEx`.
83    receiver: UnboundedReceiver<ExExEvent>,
84    /// The ID of the next notification to send to this `ExEx`.
85    next_notification_id: usize,
86    /// The finished block of the `ExEx`.
87    ///
88    /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
89    finished_height: Option<BlockNumHash>,
90}
91
92impl<N: NodePrimitives> ExExHandle<N> {
93    /// Create a new handle for the given `ExEx`.
94    ///
95    /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
96    /// [`mpsc::Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
97    pub fn new<P, E: BlockExecutorProvider<Primitives = N>>(
98        id: String,
99        node_head: Head,
100        provider: P,
101        executor: E,
102        wal_handle: WalHandle<N>,
103    ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
104        let (notification_tx, notification_rx) = mpsc::channel(1);
105        let (event_tx, event_rx) = mpsc::unbounded_channel();
106        let notifications =
107            ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);
108
109        (
110            Self {
111                id: id.clone(),
112                metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
113                sender: PollSender::new(notification_tx),
114                receiver: event_rx,
115                next_notification_id: 0,
116                finished_height: None,
117            },
118            event_tx,
119            notifications,
120        )
121    }
122
123    /// Reserves a slot in the `PollSender` channel and sends the notification if the slot was
124    /// successfully reserved.
125    ///
126    /// When the notification is sent, it is considered delivered.
127    fn send(
128        &mut self,
129        cx: &mut Context<'_>,
130        (notification_id, notification): &(usize, ExExNotification<N>),
131    ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
132        if let Some(finished_height) = self.finished_height {
133            match notification {
134                ExExNotification::ChainCommitted { new } => {
135                    // Skip the chain commit notification if the finished height of the ExEx is
136                    // higher than or equal to the tip of the new notification.
137                    // I.e., the ExEx has already processed the notification.
138                    if finished_height.number >= new.tip().number() {
139                        debug!(
140                            target: "exex::manager",
141                            exex_id = %self.id,
142                            %notification_id,
143                            ?finished_height,
144                            new_tip = %new.tip().number(),
145                            "Skipping notification"
146                        );
147
148                        self.next_notification_id = notification_id + 1;
149                        return Poll::Ready(Ok(()))
150                    }
151                }
152                // Do not handle [ExExNotification::ChainReorged] and
153                // [ExExNotification::ChainReverted] cases and always send the
154                // notification, because the ExEx should be aware of the reorgs and reverts lower
155                // than its finished height
156                ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
157            }
158        }
159
160        debug!(
161            target: "exex::manager",
162            exex_id = %self.id,
163            %notification_id,
164            "Reserving slot for notification"
165        );
166        match self.sender.poll_reserve(cx) {
167            Poll::Ready(Ok(())) => (),
168            other => return other,
169        }
170
171        debug!(
172            target: "exex::manager",
173            exex_id = %self.id,
174            %notification_id,
175            "Sending notification"
176        );
177        match self.sender.send_item(notification.clone()) {
178            Ok(()) => {
179                self.next_notification_id = notification_id + 1;
180                self.metrics.notifications_sent_total.increment(1);
181                Poll::Ready(Ok(()))
182            }
183            Err(err) => Poll::Ready(Err(err)),
184        }
185    }
186}
187
188/// Metrics for the `ExEx` manager.
189#[derive(Metrics)]
190#[metrics(scope = "exex.manager")]
191pub struct ExExManagerMetrics {
192    /// Max size of the internal state notifications buffer.
193    max_capacity: Gauge,
194    /// Current capacity of the internal state notifications buffer.
195    current_capacity: Gauge,
196    /// Current size of the internal state notifications buffer.
197    ///
198    /// Note that this might be slightly bigger than the maximum capacity in some cases.
199    buffer_size: Gauge,
200    /// Current number of `ExEx`'s on the node.
201    num_exexs: Gauge,
202}
203
204/// The execution extension manager.
205///
206/// The manager is responsible for:
207///
208/// - Receiving relevant events from the rest of the node, and sending these to the execution
209///   extensions
210/// - Backpressure
211/// - Error handling
212/// - Monitoring
213#[derive(Debug)]
214pub struct ExExManager<P, N: NodePrimitives> {
215    /// Provider for querying headers.
216    provider: P,
217
218    /// Handles to communicate with the `ExEx`'s.
219    exex_handles: Vec<ExExHandle<N>>,
220
221    /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
222    handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
223
224    /// The minimum notification ID currently present in the buffer.
225    min_id: usize,
226    /// Monotonically increasing ID for [`ExExNotification`]s.
227    next_id: usize,
228    /// Internal buffer of [`ExExNotification`]s.
229    ///
230    /// The first element of the tuple is a monotonically increasing ID unique to the notification
231    /// (the second element of the tuple).
232    buffer: VecDeque<(usize, ExExNotification<N>)>,
233    /// Max size of the internal state notifications buffer.
234    max_capacity: usize,
235    /// Current state notifications buffer capacity.
236    ///
237    /// Used to inform the execution stage of possible batch sizes.
238    current_capacity: Arc<AtomicUsize>,
239
240    /// Whether the manager is ready to receive new notifications.
241    is_ready: watch::Sender<bool>,
242
243    /// The finished height of all `ExEx`'s.
244    finished_height: watch::Sender<FinishedExExHeight>,
245
246    /// Write-Ahead Log for the [`ExExNotification`]s.
247    wal: Wal<N>,
248    /// A stream of finalized headers.
249    finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
250
251    /// A handle to the `ExEx` manager.
252    handle: ExExManagerHandle<N>,
253    /// Metrics for the `ExEx` manager.
254    metrics: ExExManagerMetrics,
255}
256
257impl<P, N> ExExManager<P, N>
258where
259    N: NodePrimitives,
260{
261    /// Create a new [`ExExManager`].
262    ///
263    /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
264    /// notification buffer in the manager.
265    ///
266    /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
267    /// notifications over [`ExExManagerHandle`]s until there is capacity again.
268    pub fn new(
269        provider: P,
270        handles: Vec<ExExHandle<N>>,
271        max_capacity: usize,
272        wal: Wal<N>,
273        finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
274    ) -> Self {
275        let num_exexs = handles.len();
276
277        let (handle_tx, handle_rx) = mpsc::unbounded_channel();
278        let (is_ready_tx, is_ready_rx) = watch::channel(true);
279        let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
280            FinishedExExHeight::NoExExs
281        } else {
282            FinishedExExHeight::NotReady
283        });
284
285        let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
286
287        let metrics = ExExManagerMetrics::default();
288        metrics.max_capacity.set(max_capacity as f64);
289        metrics.num_exexs.set(num_exexs as f64);
290
291        Self {
292            provider,
293
294            exex_handles: handles,
295
296            handle_rx,
297
298            min_id: 0,
299            next_id: 0,
300            buffer: VecDeque::with_capacity(max_capacity),
301            max_capacity,
302            current_capacity: Arc::clone(&current_capacity),
303
304            is_ready: is_ready_tx,
305            finished_height: finished_height_tx,
306
307            wal,
308            finalized_header_stream,
309
310            handle: ExExManagerHandle {
311                exex_tx: handle_tx,
312                num_exexs,
313                is_ready_receiver: is_ready_rx.clone(),
314                is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
315                current_capacity,
316                finished_height: finished_height_rx,
317            },
318            metrics,
319        }
320    }
321
322    /// Returns the handle to the manager.
323    pub fn handle(&self) -> ExExManagerHandle<N> {
324        self.handle.clone()
325    }
326
327    /// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's
328    /// readiness to receive notifications.
329    fn update_capacity(&self) {
330        let capacity = self.max_capacity.saturating_sub(self.buffer.len());
331        self.current_capacity.store(capacity, Ordering::Relaxed);
332        self.metrics.current_capacity.set(capacity as f64);
333        self.metrics.buffer_size.set(self.buffer.len() as f64);
334
335        // we can safely ignore if the channel is closed, since the manager always holds it open
336        // internally
337        let _ = self.is_ready.send(capacity > 0);
338    }
339
340    /// Pushes a new notification into the managers internal buffer, assigning the notification a
341    /// unique ID.
342    fn push_notification(&mut self, notification: ExExNotification<N>) {
343        let next_id = self.next_id;
344        self.buffer.push_back((next_id, notification));
345        self.next_id += 1;
346    }
347}
348
349impl<P, N> ExExManager<P, N>
350where
351    P: HeaderProvider,
352    N: NodePrimitives,
353{
354    /// Finalizes the WAL according to the passed finalized header.
355    ///
356    /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
357    /// necessary.
358    fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
359        debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
360
361        // Check if all ExExes are on the canonical chain
362        let exex_finished_heights = self
363            .exex_handles
364            .iter()
365            // Get ID and finished height for each ExEx
366            .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
367            // Deduplicate all hashes
368            .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
369            // Check if hashes are canonical
370            .map(|(exex_id, num_hash)| {
371                num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
372                    self.provider
373                        .is_known(&num_hash.hash)
374                        // Save the ExEx ID, finished height, and whether the hash is canonical
375                        .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
376                })
377            })
378            // We collect here to be able to log the unfinalized ExExes below
379            .collect::<Result<Vec<_>, _>>()?;
380        if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
381            // If there is a finalized header and all ExExs are on the canonical chain, finalize
382            // the WAL with either the lowest finished height among all ExExes, or finalized header
383            // – whichever is lower.
384            let lowest_finished_height = exex_finished_heights
385                .iter()
386                .copied()
387                .filter_map(|(_, num_hash, _)| num_hash)
388                .chain([(finalized_header.num_hash())])
389                .min_by_key(|num_hash| num_hash.number)
390                .unwrap();
391
392            self.wal.finalize(lowest_finished_height)?;
393            if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
394                warn!(
395                    target: "exex::manager",
396                    blocks = ?self.wal.num_blocks(),
397                    "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
398                );
399            }
400        } else {
401            let unfinalized_exexes = exex_finished_heights
402                .into_iter()
403                .filter_map(|(exex_id, num_hash, is_canonical)| {
404                    is_canonical.not().then_some((exex_id, num_hash))
405                })
406                .format_with(", ", |(exex_id, num_hash), f| {
407                    f(&format_args!("{exex_id} = {num_hash:?}"))
408                })
409                // We need this because `debug!` uses the argument twice when formatting the final
410                // log message, but the result of `format_with` can only be used once
411                .to_string();
412            debug!(
413                target: "exex::manager",
414                %unfinalized_exexes,
415                "Not all ExExes are on the canonical chain, can't finalize the WAL"
416            );
417        }
418
419        Ok(())
420    }
421}
422
423impl<P, N> Future for ExExManager<P, N>
424where
425    P: HeaderProvider + Unpin + 'static,
426    N: NodePrimitives,
427{
428    type Output = eyre::Result<()>;
429
430    /// Main loop of the [`ExExManager`]. The order of operations is as follows:
431    /// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on
432    ///    the latest state of [`ExExEvent::FinishedHeight`] events.
433    /// 2. Finalize the WAL with the finalized header, if necessary.
434    /// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update
435    ///    the internal buffer capacity.
436    /// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new
437    ///    notifications.
438    /// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and
439    ///    update the internal buffer capacity.
440    /// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes.
441    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442        let this = self.get_mut();
443
444        // Handle incoming ExEx events
445        for exex in &mut this.exex_handles {
446            while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
447                debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
448                exex.metrics.events_sent_total.increment(1);
449                match event {
450                    ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
451                }
452            }
453        }
454
455        // Drain the finalized header stream and finalize the WAL with the last header
456        let mut last_finalized_header = None;
457        while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
458            last_finalized_header = finalized_header;
459        }
460        if let Some(header) = last_finalized_header {
461            this.finalize_wal(header)?;
462        }
463
464        // Drain handle notifications
465        while this.buffer.len() < this.max_capacity {
466            if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
467                let committed_tip =
468                    notification.committed_chain().map(|chain| chain.tip().number());
469                let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
470                debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
471
472                // Commit to WAL only notifications from blockchain tree. Pipeline notifications
473                // always contain only finalized blocks.
474                match source {
475                    ExExNotificationSource::BlockchainTree => {
476                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
477                        this.wal.commit(&notification)?;
478                    }
479                    ExExNotificationSource::Pipeline => {
480                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
481                    }
482                }
483
484                this.push_notification(notification);
485                continue
486            }
487            break
488        }
489
490        // Update capacity
491        this.update_capacity();
492
493        // Advance all poll senders
494        let mut min_id = usize::MAX;
495        for idx in (0..this.exex_handles.len()).rev() {
496            let mut exex = this.exex_handles.swap_remove(idx);
497
498            // It is a logic error for this to ever underflow since the manager manages the
499            // notification IDs
500            let notification_index = exex
501                .next_notification_id
502                .checked_sub(this.min_id)
503                .expect("exex expected notification ID outside the manager's range");
504            if let Some(notification) = this.buffer.get(notification_index) {
505                if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
506                    // The channel was closed, which is irrecoverable for the manager
507                    return Poll::Ready(Err(err.into()))
508                }
509            }
510            min_id = min_id.min(exex.next_notification_id);
511            this.exex_handles.push(exex);
512        }
513
514        // Remove processed buffered notifications
515        debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
516        this.buffer.retain(|&(id, _)| id >= min_id);
517        this.min_id = min_id;
518
519        // Update capacity
520        this.update_capacity();
521
522        // Update watch channel block number
523        let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
524            exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
525        });
526        if let Ok(finished_height) = finished_height {
527            let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
528        }
529
530        Poll::Pending
531    }
532}
533
534/// A handle to communicate with the [`ExExManager`].
535#[derive(Debug)]
536pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
537    /// Channel to send notifications to the `ExEx` manager.
538    exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
539    /// The number of `ExEx`'s running on the node.
540    num_exexs: usize,
541    /// A watch channel denoting whether the manager is ready for new notifications or not.
542    ///
543    /// This is stored internally alongside a `ReusableBoxFuture` representation of the same value.
544    /// This field is only used to create a new `ReusableBoxFuture` when the handle is cloned,
545    /// but is otherwise unused.
546    is_ready_receiver: watch::Receiver<bool>,
547    /// A reusable future that resolves when the manager is ready for new
548    /// notifications.
549    is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
550    /// The current capacity of the manager's internal notification buffer.
551    current_capacity: Arc<AtomicUsize>,
552    /// The finished height of all `ExEx`'s.
553    finished_height: watch::Receiver<FinishedExExHeight>,
554}
555
556impl<N: NodePrimitives> ExExManagerHandle<N> {
557    /// Creates an empty manager handle.
558    ///
559    /// Use this if there is no manager present.
560    ///
561    /// The handle will always be ready, and have a capacity of 0.
562    pub fn empty() -> Self {
563        let (exex_tx, _) = mpsc::unbounded_channel();
564        let (_, is_ready_rx) = watch::channel(true);
565        let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
566
567        Self {
568            exex_tx,
569            num_exexs: 0,
570            is_ready_receiver: is_ready_rx.clone(),
571            is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
572            current_capacity: Arc::new(AtomicUsize::new(0)),
573            finished_height: finished_height_rx,
574        }
575    }
576
577    /// Synchronously send a notification over the channel to all execution extensions.
578    ///
579    /// Senders should call [`Self::has_capacity`] first.
580    pub fn send(
581        &self,
582        source: ExExNotificationSource,
583        notification: ExExNotification<N>,
584    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
585        self.exex_tx.send((source, notification))
586    }
587
588    /// Asynchronously send a notification over the channel to all execution extensions.
589    ///
590    /// The returned future resolves when the notification has been delivered. If there is no
591    /// capacity in the channel, the future will wait.
592    pub async fn send_async(
593        &mut self,
594        source: ExExNotificationSource,
595        notification: ExExNotification<N>,
596    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
597        self.ready().await;
598        self.exex_tx.send((source, notification))
599    }
600
601    /// Get the current capacity of the `ExEx` manager's internal notification buffer.
602    pub fn capacity(&self) -> usize {
603        self.current_capacity.load(Ordering::Relaxed)
604    }
605
606    /// Whether there is capacity in the `ExEx` manager's internal notification buffer.
607    ///
608    /// If this returns `false`, the owner of the handle should **NOT** send new notifications over
609    /// the channel until the manager is ready again, as this can lead to unbounded memory growth.
610    pub fn has_capacity(&self) -> bool {
611        self.capacity() > 0
612    }
613
614    /// Returns `true` if there are `ExEx`'s installed in the node.
615    pub const fn has_exexs(&self) -> bool {
616        self.num_exexs > 0
617    }
618
619    /// The finished height of all `ExEx`'s.
620    pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
621        self.finished_height.clone()
622    }
623
624    /// Wait until the manager is ready for new notifications.
625    pub async fn ready(&mut self) {
626        poll_fn(|cx| self.poll_ready(cx)).await
627    }
628
629    /// Wait until the manager is ready for new notifications.
630    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
631        let rx = ready!(self.is_ready.poll(cx));
632        self.is_ready.set(make_wait_future(rx));
633        Poll::Ready(())
634    }
635}
636
637/// Creates a future that resolves once the given watch channel receiver is true.
638async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
639    // NOTE(onbjerg): We can ignore the error here, because if the channel is closed, the node
640    // is shutting down.
641    let _ = rx.wait_for(|ready| *ready).await;
642    rx
643}
644
645impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
646    fn clone(&self) -> Self {
647        Self {
648            exex_tx: self.exex_tx.clone(),
649            num_exexs: self.num_exexs,
650            is_ready_receiver: self.is_ready_receiver.clone(),
651            is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
652            current_capacity: self.current_capacity.clone(),
653            finished_height: self.finished_height.clone(),
654        }
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use alloy_primitives::B256;
662    use futures::{StreamExt, TryStreamExt};
663    use rand::Rng;
664    use reth_db_common::init::init_genesis;
665    use reth_evm::test_utils::MockExecutorProvider;
666    use reth_evm_ethereum::execute::EthExecutorProvider;
667    use reth_primitives::SealedBlockWithSenders;
668    use reth_provider::{
669        providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
670        BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant,
671    };
672    use reth_testing_utils::generators::{self, random_block, BlockParams};
673
674    fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
675        let (tx, rx) = watch::channel(None);
676        // Do not drop the sender, otherwise the receiver will always return an error
677        std::mem::forget(tx);
678        ForkChoiceStream::new(rx)
679    }
680
681    #[tokio::test]
682    async fn test_delivers_events() {
683        let temp_dir = tempfile::tempdir().unwrap();
684        let wal = Wal::new(temp_dir.path()).unwrap();
685
686        let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
687            "test_exex".to_string(),
688            Head::default(),
689            (),
690            MockExecutorProvider::default(),
691            wal.handle(),
692        );
693
694        // Send an event and check that it's delivered correctly
695        let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
696        event_tx.send(event).unwrap();
697        let received_event = exex_handle.receiver.recv().await.unwrap();
698        assert_eq!(received_event, event);
699    }
700
701    #[tokio::test]
702    async fn test_has_exexs() {
703        let temp_dir = tempfile::tempdir().unwrap();
704        let wal = Wal::new(temp_dir.path()).unwrap();
705
706        let (exex_handle_1, _, _) = ExExHandle::new(
707            "test_exex_1".to_string(),
708            Head::default(),
709            (),
710            MockExecutorProvider::default(),
711            wal.handle(),
712        );
713
714        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
715            .handle
716            .has_exexs());
717
718        assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
719            .handle
720            .has_exexs());
721    }
722
723    #[tokio::test]
724    async fn test_has_capacity() {
725        let temp_dir = tempfile::tempdir().unwrap();
726        let wal = Wal::new(temp_dir.path()).unwrap();
727
728        let (exex_handle_1, _, _) = ExExHandle::new(
729            "test_exex_1".to_string(),
730            Head::default(),
731            (),
732            MockExecutorProvider::default(),
733            wal.handle(),
734        );
735
736        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
737            .handle
738            .has_capacity());
739
740        assert!(ExExManager::new(
741            (),
742            vec![exex_handle_1],
743            10,
744            wal,
745            empty_finalized_header_stream()
746        )
747        .handle
748        .has_capacity());
749    }
750
751    #[test]
752    fn test_push_notification() {
753        let temp_dir = tempfile::tempdir().unwrap();
754        let wal = Wal::new(temp_dir.path()).unwrap();
755
756        let (exex_handle, _, _) = ExExHandle::new(
757            "test_exex".to_string(),
758            Head::default(),
759            (),
760            MockExecutorProvider::default(),
761            wal.handle(),
762        );
763
764        // Create a mock ExExManager and add the exex_handle to it
765        let mut exex_manager =
766            ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
767
768        // Define the notification for testing
769        let mut block1: SealedBlockWithSenders = Default::default();
770        block1.block.header.set_hash(B256::new([0x01; 32]));
771        block1.block.header.set_block_number(10);
772
773        let notification1 = ExExNotification::ChainCommitted {
774            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
775        };
776
777        // Push the first notification
778        exex_manager.push_notification(notification1.clone());
779
780        // Verify the buffer contains the notification with the correct ID
781        assert_eq!(exex_manager.buffer.len(), 1);
782        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
783        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
784        assert_eq!(exex_manager.next_id, 1);
785
786        // Push another notification
787        let mut block2: SealedBlockWithSenders = Default::default();
788        block2.block.header.set_hash(B256::new([0x02; 32]));
789        block2.block.header.set_block_number(20);
790
791        let notification2 = ExExNotification::ChainCommitted {
792            new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
793        };
794
795        exex_manager.push_notification(notification2.clone());
796
797        // Verify the buffer contains both notifications with correct IDs
798        assert_eq!(exex_manager.buffer.len(), 2);
799        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
800        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
801        assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
802        assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
803        assert_eq!(exex_manager.next_id, 2);
804    }
805
806    #[test]
807    fn test_update_capacity() {
808        let temp_dir = tempfile::tempdir().unwrap();
809        let wal = Wal::new(temp_dir.path()).unwrap();
810
811        let (exex_handle, _, _) = ExExHandle::new(
812            "test_exex".to_string(),
813            Head::default(),
814            (),
815            MockExecutorProvider::default(),
816            wal.handle(),
817        );
818
819        // Create a mock ExExManager and add the exex_handle to it
820        let max_capacity = 5;
821        let mut exex_manager = ExExManager::new(
822            (),
823            vec![exex_handle],
824            max_capacity,
825            wal,
826            empty_finalized_header_stream(),
827        );
828
829        // Push some notifications to fill part of the buffer
830        let mut block1: SealedBlockWithSenders = Default::default();
831        block1.block.header.set_hash(B256::new([0x01; 32]));
832        block1.block.header.set_block_number(10);
833
834        let notification1 = ExExNotification::ChainCommitted {
835            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
836        };
837
838        exex_manager.push_notification(notification1.clone());
839        exex_manager.push_notification(notification1);
840
841        // Update capacity
842        exex_manager.update_capacity();
843
844        // Verify current capacity and metrics
845        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
846
847        // Clear the buffer and update capacity
848        exex_manager.buffer.clear();
849        exex_manager.update_capacity();
850
851        // Verify current capacity
852        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
853    }
854
855    #[tokio::test]
856    async fn test_updates_block_height() {
857        let temp_dir = tempfile::tempdir().unwrap();
858        let wal = Wal::new(temp_dir.path()).unwrap();
859
860        let provider_factory = create_test_provider_factory();
861
862        let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
863            "test_exex".to_string(),
864            Head::default(),
865            (),
866            MockExecutorProvider::default(),
867            wal.handle(),
868        );
869
870        // Check initial block height
871        assert!(exex_handle.finished_height.is_none());
872
873        // Update the block height via an event
874        let block = BlockNumHash::new(42, B256::random());
875        event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
876
877        // Create a mock ExExManager and add the exex_handle to it
878        let exex_manager = ExExManager::new(
879            provider_factory,
880            vec![exex_handle],
881            10,
882            Wal::new(temp_dir.path()).unwrap(),
883            empty_finalized_header_stream(),
884        );
885
886        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
887
888        // Pin the ExExManager to call the poll method
889        let mut pinned_manager = std::pin::pin!(exex_manager);
890        let _ = pinned_manager.as_mut().poll(&mut cx);
891
892        // Check that the block height was updated
893        let updated_exex_handle = &pinned_manager.exex_handles[0];
894        assert_eq!(updated_exex_handle.finished_height, Some(block));
895
896        // Get the receiver for the finished height
897        let mut receiver = pinned_manager.handle.finished_height();
898
899        // Wait for a new value to be sent
900        receiver.changed().await.unwrap();
901
902        // Get the latest value
903        let finished_height = *receiver.borrow();
904
905        // The finished height should be updated to the lower block height
906        assert_eq!(finished_height, FinishedExExHeight::Height(42));
907    }
908
909    #[tokio::test]
910    async fn test_updates_block_height_lower() {
911        let temp_dir = tempfile::tempdir().unwrap();
912        let wal = Wal::new(temp_dir.path()).unwrap();
913
914        let provider_factory = create_test_provider_factory();
915
916        // Create two `ExExHandle` instances
917        let (exex_handle1, event_tx1, _) = ExExHandle::new(
918            "test_exex1".to_string(),
919            Head::default(),
920            (),
921            MockExecutorProvider::default(),
922            wal.handle(),
923        );
924        let (exex_handle2, event_tx2, _) = ExExHandle::new(
925            "test_exex2".to_string(),
926            Head::default(),
927            (),
928            MockExecutorProvider::default(),
929            wal.handle(),
930        );
931
932        let block1 = BlockNumHash::new(42, B256::random());
933        let block2 = BlockNumHash::new(10, B256::random());
934
935        // Send events to update the block heights of the two handles, with the second being lower
936        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
937        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
938
939        let exex_manager = ExExManager::new(
940            provider_factory,
941            vec![exex_handle1, exex_handle2],
942            10,
943            Wal::new(temp_dir.path()).unwrap(),
944            empty_finalized_header_stream(),
945        );
946
947        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
948
949        let mut pinned_manager = std::pin::pin!(exex_manager);
950
951        let _ = pinned_manager.as_mut().poll(&mut cx);
952
953        // Get the receiver for the finished height
954        let mut receiver = pinned_manager.handle.finished_height();
955
956        // Wait for a new value to be sent
957        receiver.changed().await.unwrap();
958
959        // Get the latest value
960        let finished_height = *receiver.borrow();
961
962        // The finished height should be updated to the lower block height
963        assert_eq!(finished_height, FinishedExExHeight::Height(10));
964    }
965
966    #[tokio::test]
967    async fn test_updates_block_height_greater() {
968        let temp_dir = tempfile::tempdir().unwrap();
969        let wal = Wal::new(temp_dir.path()).unwrap();
970
971        let provider_factory = create_test_provider_factory();
972
973        // Create two `ExExHandle` instances
974        let (exex_handle1, event_tx1, _) = ExExHandle::new(
975            "test_exex1".to_string(),
976            Head::default(),
977            (),
978            MockExecutorProvider::default(),
979            wal.handle(),
980        );
981        let (exex_handle2, event_tx2, _) = ExExHandle::new(
982            "test_exex2".to_string(),
983            Head::default(),
984            (),
985            MockExecutorProvider::default(),
986            wal.handle(),
987        );
988
989        // Assert that the initial block height is `None` for the first `ExExHandle`.
990        assert!(exex_handle1.finished_height.is_none());
991
992        let block1 = BlockNumHash::new(42, B256::random());
993        let block2 = BlockNumHash::new(100, B256::random());
994
995        // Send events to update the block heights of the two handles, with the second being higher.
996        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
997        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
998
999        let exex_manager = ExExManager::new(
1000            provider_factory,
1001            vec![exex_handle1, exex_handle2],
1002            10,
1003            Wal::new(temp_dir.path()).unwrap(),
1004            empty_finalized_header_stream(),
1005        );
1006
1007        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1008
1009        let mut pinned_manager = std::pin::pin!(exex_manager);
1010
1011        let _ = pinned_manager.as_mut().poll(&mut cx);
1012
1013        // Get the receiver for the finished height
1014        let mut receiver = pinned_manager.handle.finished_height();
1015
1016        // Wait for a new value to be sent
1017        receiver.changed().await.unwrap();
1018
1019        // Get the latest value
1020        let finished_height = *receiver.borrow();
1021
1022        // The finished height should be updated to the lower block height
1023        assert_eq!(finished_height, FinishedExExHeight::Height(42));
1024
1025        // // The lower block height should be retained
1026        // let updated_exex_handle = &pinned_manager.exex_handles[0];
1027        // assert_eq!(updated_exex_handle.finished_height, Some(42));
1028    }
1029
1030    #[tokio::test]
1031    async fn test_exex_manager_capacity() {
1032        let temp_dir = tempfile::tempdir().unwrap();
1033        let wal = Wal::new(temp_dir.path()).unwrap();
1034
1035        let provider_factory = create_test_provider_factory();
1036
1037        let (exex_handle_1, _, _) = ExExHandle::new(
1038            "test_exex_1".to_string(),
1039            Head::default(),
1040            (),
1041            MockExecutorProvider::default(),
1042            wal.handle(),
1043        );
1044
1045        // Create an ExExManager with a small max capacity
1046        let max_capacity = 2;
1047        let mut exex_manager = ExExManager::new(
1048            provider_factory,
1049            vec![exex_handle_1],
1050            max_capacity,
1051            Wal::new(temp_dir.path()).unwrap(),
1052            empty_finalized_header_stream(),
1053        );
1054
1055        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1056
1057        // Setup a notification
1058        let notification = ExExNotification::ChainCommitted {
1059            new: Arc::new(Chain::new(
1060                vec![Default::default()],
1061                Default::default(),
1062                Default::default(),
1063            )),
1064        };
1065
1066        // Send notifications to go over the max capacity
1067        exex_manager
1068            .handle
1069            .exex_tx
1070            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1071            .unwrap();
1072        exex_manager
1073            .handle
1074            .exex_tx
1075            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1076            .unwrap();
1077        exex_manager
1078            .handle
1079            .exex_tx
1080            .send((ExExNotificationSource::BlockchainTree, notification))
1081            .unwrap();
1082
1083        // Pin the ExExManager to call the poll method
1084        let mut pinned_manager = std::pin::pin!(exex_manager);
1085
1086        // Before polling, the next notification ID should be 0 and the buffer should be empty
1087        assert_eq!(pinned_manager.next_id, 0);
1088        assert_eq!(pinned_manager.buffer.len(), 0);
1089
1090        let _ = pinned_manager.as_mut().poll(&mut cx);
1091
1092        // After polling, the next notification ID and buffer size should be updated
1093        assert_eq!(pinned_manager.next_id, 2);
1094        assert_eq!(pinned_manager.buffer.len(), 2);
1095    }
1096
1097    #[tokio::test]
1098    async fn exex_handle_new() {
1099        let provider_factory = create_test_provider_factory();
1100        init_genesis(&provider_factory).unwrap();
1101        let provider = BlockchainProvider2::new(provider_factory).unwrap();
1102
1103        let temp_dir = tempfile::tempdir().unwrap();
1104        let wal = Wal::new(temp_dir.path()).unwrap();
1105
1106        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1107            "test_exex".to_string(),
1108            Head::default(),
1109            provider,
1110            EthExecutorProvider::mainnet(),
1111            wal.handle(),
1112        );
1113
1114        // Check initial state
1115        assert_eq!(exex_handle.id, "test_exex");
1116        assert_eq!(exex_handle.next_notification_id, 0);
1117
1118        // Setup two blocks for the chain commit notification
1119        let mut block1: SealedBlockWithSenders = Default::default();
1120        block1.block.header.set_hash(B256::new([0x01; 32]));
1121        block1.block.header.set_block_number(10);
1122
1123        let mut block2: SealedBlockWithSenders = Default::default();
1124        block2.block.header.set_hash(B256::new([0x02; 32]));
1125        block2.block.header.set_block_number(11);
1126
1127        // Setup a notification
1128        let notification = ExExNotification::ChainCommitted {
1129            new: Arc::new(Chain::new(
1130                vec![block1.clone(), block2.clone()],
1131                Default::default(),
1132                Default::default(),
1133            )),
1134        };
1135
1136        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1137
1138        // Send a notification and ensure it's received correctly
1139        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1140            Poll::Ready(Ok(())) => {
1141                let received_notification = notifications.next().await.unwrap().unwrap();
1142                assert_eq!(received_notification, notification);
1143            }
1144            Poll::Pending => panic!("Notification send is pending"),
1145            Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e),
1146        }
1147
1148        // Ensure the notification ID was incremented
1149        assert_eq!(exex_handle.next_notification_id, 23);
1150    }
1151
1152    #[tokio::test]
1153    async fn test_notification_if_finished_height_gt_chain_tip() {
1154        let provider_factory = create_test_provider_factory();
1155        init_genesis(&provider_factory).unwrap();
1156        let provider = BlockchainProvider2::new(provider_factory).unwrap();
1157
1158        let temp_dir = tempfile::tempdir().unwrap();
1159        let wal = Wal::new(temp_dir.path()).unwrap();
1160
1161        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1162            "test_exex".to_string(),
1163            Head::default(),
1164            provider,
1165            EthExecutorProvider::mainnet(),
1166            wal.handle(),
1167        );
1168
1169        // Set finished_height to a value higher than the block tip
1170        exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1171
1172        let mut block1: SealedBlockWithSenders = Default::default();
1173        block1.block.header.set_hash(B256::new([0x01; 32]));
1174        block1.block.header.set_block_number(10);
1175
1176        let notification = ExExNotification::ChainCommitted {
1177            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1178        };
1179
1180        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1181
1182        // Send the notification
1183        match exex_handle.send(&mut cx, &(22, notification)) {
1184            Poll::Ready(Ok(())) => {
1185                poll_fn(|cx| {
1186                    // The notification should be skipped, so nothing should be sent.
1187                    // Check that the receiver channel is indeed empty
1188                    assert!(notifications.poll_next_unpin(cx).is_pending());
1189                    Poll::Ready(())
1190                })
1191                .await;
1192            }
1193            Poll::Pending | Poll::Ready(Err(_)) => {
1194                panic!("Notification should not be pending or fail");
1195            }
1196        }
1197
1198        // Ensure the notification ID was still incremented
1199        assert_eq!(exex_handle.next_notification_id, 23);
1200    }
1201
1202    #[tokio::test]
1203    async fn test_sends_chain_reorged_notification() {
1204        let provider_factory = create_test_provider_factory();
1205        init_genesis(&provider_factory).unwrap();
1206        let provider = BlockchainProvider2::new(provider_factory).unwrap();
1207
1208        let temp_dir = tempfile::tempdir().unwrap();
1209        let wal = Wal::new(temp_dir.path()).unwrap();
1210
1211        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1212            "test_exex".to_string(),
1213            Head::default(),
1214            provider,
1215            EthExecutorProvider::mainnet(),
1216            wal.handle(),
1217        );
1218
1219        let notification = ExExNotification::ChainReorged {
1220            old: Arc::new(Chain::default()),
1221            new: Arc::new(Chain::default()),
1222        };
1223
1224        // Even if the finished height is higher than the tip of the new chain, the reorg
1225        // notification should be received
1226        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1227
1228        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1229
1230        // Send the notification
1231        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1232            Poll::Ready(Ok(())) => {
1233                let received_notification = notifications.next().await.unwrap().unwrap();
1234                assert_eq!(received_notification, notification);
1235            }
1236            Poll::Pending | Poll::Ready(Err(_)) => {
1237                panic!("Notification should not be pending or fail")
1238            }
1239        }
1240
1241        // Ensure the notification ID was incremented
1242        assert_eq!(exex_handle.next_notification_id, 23);
1243    }
1244
1245    #[tokio::test]
1246    async fn test_sends_chain_reverted_notification() {
1247        let provider_factory = create_test_provider_factory();
1248        init_genesis(&provider_factory).unwrap();
1249        let provider = BlockchainProvider2::new(provider_factory).unwrap();
1250
1251        let temp_dir = tempfile::tempdir().unwrap();
1252        let wal = Wal::new(temp_dir.path()).unwrap();
1253
1254        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1255            "test_exex".to_string(),
1256            Head::default(),
1257            provider,
1258            EthExecutorProvider::mainnet(),
1259            wal.handle(),
1260        );
1261
1262        let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1263
1264        // Even if the finished height is higher than the tip of the new chain, the reorg
1265        // notification should be received
1266        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1267
1268        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1269
1270        // Send the notification
1271        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1272            Poll::Ready(Ok(())) => {
1273                let received_notification = notifications.next().await.unwrap().unwrap();
1274                assert_eq!(received_notification, notification);
1275            }
1276            Poll::Pending | Poll::Ready(Err(_)) => {
1277                panic!("Notification should not be pending or fail")
1278            }
1279        }
1280
1281        // Ensure the notification ID was incremented
1282        assert_eq!(exex_handle.next_notification_id, 23);
1283    }
1284
1285    #[tokio::test]
1286    async fn test_exex_wal() -> eyre::Result<()> {
1287        reth_tracing::init_test_tracing();
1288
1289        let mut rng = generators::rng();
1290
1291        let provider_factory = create_test_provider_factory();
1292        let genesis_hash = init_genesis(&provider_factory).unwrap();
1293        let genesis_block = provider_factory
1294            .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1295            .unwrap()
1296            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1297
1298        let block = random_block(
1299            &mut rng,
1300            genesis_block.number + 1,
1301            BlockParams { parent: Some(genesis_hash), ..Default::default() },
1302        )
1303        .seal_with_senders::<reth_primitives::Block>()
1304        .unwrap();
1305        let provider_rw = provider_factory.database_provider_rw().unwrap();
1306        provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap();
1307        provider_rw.commit().unwrap();
1308
1309        let provider = BlockchainProvider2::new(provider_factory).unwrap();
1310
1311        let temp_dir = tempfile::tempdir().unwrap();
1312        let wal = Wal::new(temp_dir.path()).unwrap();
1313
1314        let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1315            "test_exex".to_string(),
1316            Head::default(),
1317            provider.clone(),
1318            EthExecutorProvider::mainnet(),
1319            wal.handle(),
1320        );
1321
1322        let genesis_notification = ExExNotification::ChainCommitted {
1323            new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
1324        };
1325        let notification = ExExNotification::ChainCommitted {
1326            new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
1327        };
1328
1329        let (finalized_headers_tx, rx) = watch::channel(None);
1330        finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
1331        let finalized_header_stream = ForkChoiceStream::new(rx);
1332
1333        let mut exex_manager = std::pin::pin!(ExExManager::new(
1334            provider,
1335            vec![exex_handle],
1336            2,
1337            wal,
1338            finalized_header_stream
1339        ));
1340
1341        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1342
1343        exex_manager
1344            .handle()
1345            .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1346        exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1347
1348        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1349        assert_eq!(
1350            notifications.try_poll_next_unpin(&mut cx)?,
1351            Poll::Ready(Some(genesis_notification))
1352        );
1353        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1354        assert_eq!(
1355            notifications.try_poll_next_unpin(&mut cx)?,
1356            Poll::Ready(Some(notification.clone()))
1357        );
1358        // WAL shouldn't contain the genesis notification, because it's finalized
1359        assert_eq!(
1360            exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
1361            [notification.clone()]
1362        );
1363
1364        finalized_headers_tx.send(Some(block.header.clone()))?;
1365        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1366        // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
1367        assert_eq!(
1368            exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
1369            [notification.clone()]
1370        );
1371
1372        // Send a `FinishedHeight` event with a non-canonical block
1373        events_tx
1374            .send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
1375            .unwrap();
1376
1377        finalized_headers_tx.send(Some(block.header.clone()))?;
1378        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1379        // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
1380        // non-canonical block
1381        assert_eq!(
1382            exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
1383            [notification]
1384        );
1385
1386        // Send a `FinishedHeight` event with a canonical block
1387        events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1388
1389        finalized_headers_tx.send(Some(block.header.clone()))?;
1390        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1391        // WAL is finalized
1392        assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1393
1394        Ok(())
1395    }
1396}