reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use futures::{Stream, StreamExt};
4use reth_chainspec::Head;
5use reth_evm::execute::BlockExecutorProvider;
6use reth_exex_types::ExExHead;
7use reth_node_api::NodePrimitives;
8use reth_primitives::EthPrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12    fmt::Debug,
13    pin::Pin,
14    sync::Arc,
15    task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
20/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
21/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
22#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25    E: BlockExecutorProvider,
26{
27    inner: ExExNotificationsInner<P, E>,
28}
29
30/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
31/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
32/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
33pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
37    ///
38    /// It's a no-op if the stream has already been configured without a head.
39    ///
40    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
41    fn set_without_head(&mut self);
42
43    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
44    /// head.
45    ///
46    /// It's a no-op if the stream has already been configured with a head.
47    ///
48    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
49    fn set_with_head(&mut self, exex_head: ExExHead);
50
51    /// Returns a new [`ExExNotificationsStream`] without a head.
52    ///
53    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
54    fn without_head(self) -> Self
55    where
56        Self: Sized;
57
58    /// Returns a new [`ExExNotificationsStream`] with the provided head.
59    ///
60    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
61    fn with_head(self, exex_head: ExExHead) -> Self
62    where
63        Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69    E: BlockExecutorProvider,
70{
71    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
72    WithoutHead(ExExNotificationsWithoutHead<P, E>),
73    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
74    /// are committed or reverted after the given head.
75    WithHead(ExExNotificationsWithHead<P, E>),
76    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
77    /// [`ExExNotificationsInner::WithHead`].
78    Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83    E: BlockExecutorProvider,
84{
85    /// Creates a new stream of [`ExExNotifications`] without a head.
86    pub const fn new(
87        node_head: Head,
88        provider: P,
89        executor: E,
90        notifications: Receiver<ExExNotification<E::Primitives>>,
91        wal_handle: WalHandle<E::Primitives>,
92    ) -> Self {
93        Self {
94            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95                node_head,
96                provider,
97                executor,
98                notifications,
99                wal_handle,
100            )),
101        }
102    }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
109        + Clone
110        + Unpin
111        + 'static,
112{
113    fn set_without_head(&mut self) {
114        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
115        self.inner = ExExNotificationsInner::WithoutHead(match current {
116            ExExNotificationsInner::WithoutHead(notifications) => notifications,
117            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
118                notifications.node_head,
119                notifications.provider,
120                notifications.executor,
121                notifications.notifications,
122                notifications.wal_handle,
123            ),
124            ExExNotificationsInner::Invalid => unreachable!(),
125        });
126    }
127
128    fn set_with_head(&mut self, exex_head: ExExHead) {
129        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
130        self.inner = ExExNotificationsInner::WithHead(match current {
131            ExExNotificationsInner::WithoutHead(notifications) => {
132                notifications.with_head(exex_head)
133            }
134            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new(
135                notifications.node_head,
136                notifications.provider,
137                notifications.executor,
138                notifications.notifications,
139                notifications.wal_handle,
140                exex_head,
141            ),
142            ExExNotificationsInner::Invalid => unreachable!(),
143        });
144    }
145
146    fn without_head(mut self) -> Self {
147        self.set_without_head();
148        self
149    }
150
151    fn with_head(mut self, exex_head: ExExHead) -> Self {
152        self.set_with_head(exex_head);
153        self
154    }
155}
156
157impl<P, E> Stream for ExExNotifications<P, E>
158where
159    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
160    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
161        + Clone
162        + Unpin
163        + 'static,
164{
165    type Item = eyre::Result<ExExNotification<E::Primitives>>;
166
167    fn poll_next(
168        self: std::pin::Pin<&mut Self>,
169        cx: &mut std::task::Context<'_>,
170    ) -> std::task::Poll<Option<Self::Item>> {
171        match &mut self.get_mut().inner {
172            ExExNotificationsInner::WithoutHead(notifications) => {
173                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
174            }
175            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
176            ExExNotificationsInner::Invalid => unreachable!(),
177        }
178    }
179}
180
181/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
182pub struct ExExNotificationsWithoutHead<P, E>
183where
184    E: BlockExecutorProvider,
185{
186    node_head: Head,
187    provider: P,
188    executor: E,
189    notifications: Receiver<ExExNotification<E::Primitives>>,
190    wal_handle: WalHandle<E::Primitives>,
191}
192
193impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
194where
195    E: Debug + BlockExecutorProvider,
196{
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        f.debug_struct("ExExNotifications")
199            .field("provider", &self.provider)
200            .field("executor", &self.executor)
201            .field("notifications", &self.notifications)
202            .finish()
203    }
204}
205
206impl<P, E> ExExNotificationsWithoutHead<P, E>
207where
208    E: BlockExecutorProvider,
209{
210    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
211    const fn new(
212        node_head: Head,
213        provider: P,
214        executor: E,
215        notifications: Receiver<ExExNotification<E::Primitives>>,
216        wal_handle: WalHandle<E::Primitives>,
217    ) -> Self {
218        Self { node_head, provider, executor, notifications, wal_handle }
219    }
220
221    /// Subscribe to notifications with the given head.
222    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
223        ExExNotificationsWithHead::new(
224            self.node_head,
225            self.provider,
226            self.executor,
227            self.notifications,
228            self.wal_handle,
229            head,
230        )
231    }
232}
233
234impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
235where
236    E: Unpin + BlockExecutorProvider,
237{
238    type Item = ExExNotification<E::Primitives>;
239
240    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241        self.get_mut().notifications.poll_recv(cx)
242    }
243}
244
245/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
246/// committed or reverted after the given head. The head is the ExEx's latest view of the host
247/// chain.
248///
249/// Notifications will be sent starting from the head, not inclusive. For example, if
250/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
251/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
252/// process block 11.
253#[derive(Debug)]
254pub struct ExExNotificationsWithHead<P, E>
255where
256    E: BlockExecutorProvider,
257{
258    node_head: Head,
259    provider: P,
260    executor: E,
261    notifications: Receiver<ExExNotification<E::Primitives>>,
262    wal_handle: WalHandle<E::Primitives>,
263    exex_head: ExExHead,
264    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
265    /// revert its head.
266    pending_check_canonical: bool,
267    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
268    /// the missing blocks.
269    pending_check_backfill: bool,
270    /// The backfill job to run before consuming any notifications.
271    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
272}
273
274impl<P, E> ExExNotificationsWithHead<P, E>
275where
276    E: BlockExecutorProvider,
277{
278    /// Creates a new [`ExExNotificationsWithHead`].
279    const fn new(
280        node_head: Head,
281        provider: P,
282        executor: E,
283        notifications: Receiver<ExExNotification<E::Primitives>>,
284        wal_handle: WalHandle<E::Primitives>,
285        exex_head: ExExHead,
286    ) -> Self {
287        Self {
288            node_head,
289            provider,
290            executor,
291            notifications,
292            wal_handle,
293            exex_head,
294            pending_check_canonical: true,
295            pending_check_backfill: true,
296            backfill_job: None,
297        }
298    }
299}
300
301impl<P, E> ExExNotificationsWithHead<P, E>
302where
303    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
304    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
305        + Clone
306        + Unpin
307        + 'static,
308{
309    /// Checks if the ExEx head is on the canonical chain.
310    ///
311    /// If the head block is not found in the database or it's ahead of the node head, it means
312    /// we're not on the canonical chain and we need to revert the notification with the ExEx
313    /// head block.
314    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
315        if self.provider.is_known(&self.exex_head.block.hash)? &&
316            self.exex_head.block.number <= self.node_head.number
317        {
318            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
319            return Ok(None)
320        }
321
322        // If the head block is not found in the database, it means we're not on the canonical
323        // chain.
324
325        // Get the committed notification for the head block from the WAL.
326        let Some(notification) =
327            self.wal_handle.get_committed_notification_by_block_hash(&self.exex_head.block.hash)?
328        else {
329            return Err(eyre::eyre!(
330                "Could not find notification for block hash {:?} in the WAL",
331                self.exex_head.block.hash
332            ))
333        };
334
335        // Update the head block hash to the parent hash of the first committed block.
336        let committed_chain = notification.committed_chain().unwrap();
337        let new_exex_head =
338            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
339        debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
340        self.exex_head.block = new_exex_head;
341
342        // Return an inverted notification. See the documentation for
343        // `ExExNotification::into_inverted`.
344        Ok(Some(notification.into_inverted()))
345    }
346
347    /// Compares the node head against the ExEx head, and backfills if needed.
348    ///
349    /// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the
350    /// canonical chain.
351    ///
352    /// Possible situations are:
353    /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
354    ///   node database.
355    /// - ExEx is at the same block number as the node head (`node_head.number ==
356    ///   exex_head.number`). Nothing to do.
357    fn check_backfill(&mut self) -> eyre::Result<()> {
358        let backfill_job_factory =
359            BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
360        match self.exex_head.block.number.cmp(&self.node_head.number) {
361            std::cmp::Ordering::Less => {
362                // ExEx is behind the node head, start backfill
363                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
364                let backfill = backfill_job_factory
365                    .backfill(self.exex_head.block.number + 1..=self.node_head.number)
366                    .into_stream();
367                self.backfill_job = Some(backfill);
368            }
369            std::cmp::Ordering::Equal => {
370                debug!(target: "exex::notifications", "ExEx is at the node head");
371            }
372            std::cmp::Ordering::Greater => {
373                return Err(eyre::eyre!("ExEx is ahead of the node head"))
374            }
375        };
376
377        Ok(())
378    }
379}
380
381impl<P, E> Stream for ExExNotificationsWithHead<P, E>
382where
383    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
384    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
385        + Clone
386        + Unpin
387        + 'static,
388{
389    type Item = eyre::Result<ExExNotification<E::Primitives>>;
390
391    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
392        let this = self.get_mut();
393
394        if this.pending_check_canonical {
395            if let Some(canonical_notification) = this.check_canonical()? {
396                return Poll::Ready(Some(Ok(canonical_notification)))
397            }
398
399            // ExEx head is on the canonical chain, we no longer need to check it
400            this.pending_check_canonical = false;
401        }
402
403        if this.pending_check_backfill {
404            this.check_backfill()?;
405            this.pending_check_backfill = false;
406        }
407
408        if let Some(backfill_job) = &mut this.backfill_job {
409            debug!(target: "exex::notifications", "Polling backfill job");
410            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
411                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
412                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
413                    new: Arc::new(chain),
414                })))
415            }
416
417            // Backfill job is done, remove it
418            this.backfill_job = None;
419        }
420
421        let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
422            return Poll::Ready(None)
423        };
424
425        if let Some(committed_chain) = notification.committed_chain() {
426            this.exex_head.block = committed_chain.tip().num_hash();
427        } else if let Some(reverted_chain) = notification.reverted_chain() {
428            let first_block = reverted_chain.first();
429            this.exex_head.block = (first_block.parent_hash(), first_block.number() - 1).into();
430        }
431
432        Poll::Ready(Some(Ok(notification)))
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use crate::Wal;
439
440    use super::*;
441    use alloy_consensus::Header;
442    use alloy_eips::BlockNumHash;
443    use eyre::OptionExt;
444    use futures::StreamExt;
445    use reth_db_common::init::init_genesis;
446    use reth_evm_ethereum::execute::EthExecutorProvider;
447    use reth_primitives::{Block, BlockExt};
448    use reth_provider::{
449        providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter,
450        Chain, DatabaseProviderFactory, StorageLocation,
451    };
452    use reth_testing_utils::generators::{self, random_block, BlockParams};
453    use tokio::sync::mpsc;
454
455    #[tokio::test(flavor = "multi_thread")]
456    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
457        let mut rng = generators::rng();
458
459        let temp_dir = tempfile::tempdir().unwrap();
460        let wal = Wal::new(temp_dir.path()).unwrap();
461
462        let provider_factory = create_test_provider_factory();
463        let genesis_hash = init_genesis(&provider_factory)?;
464        let genesis_block = provider_factory
465            .block(genesis_hash.into())?
466            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
467
468        let provider = BlockchainProvider2::new(provider_factory.clone())?;
469
470        let node_head_block = random_block(
471            &mut rng,
472            genesis_block.number + 1,
473            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
474        );
475        let provider_rw = provider_factory.provider_rw()?;
476        provider_rw.insert_block(
477            node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?,
478            StorageLocation::Database,
479        )?;
480        provider_rw.commit()?;
481
482        let node_head = Head {
483            number: node_head_block.number,
484            hash: node_head_block.hash(),
485            ..Default::default()
486        };
487        let exex_head =
488            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
489
490        let notification = ExExNotification::ChainCommitted {
491            new: Arc::new(Chain::new(
492                vec![random_block(
493                    &mut rng,
494                    node_head.number + 1,
495                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
496                )
497                .seal_with_senders()
498                .ok_or_eyre("failed to recover senders")?],
499                Default::default(),
500                None,
501            )),
502        };
503
504        let (notifications_tx, notifications_rx) = mpsc::channel(1);
505
506        notifications_tx.send(notification.clone()).await?;
507
508        let mut notifications = ExExNotificationsWithoutHead::new(
509            node_head,
510            provider,
511            EthExecutorProvider::mainnet(),
512            notifications_rx,
513            wal.handle(),
514        )
515        .with_head(exex_head);
516
517        // First notification is the backfill of missing blocks from the canonical chain
518        assert_eq!(
519            notifications.next().await.transpose()?,
520            Some(ExExNotification::ChainCommitted {
521                new: Arc::new(
522                    BackfillJobFactory::new(
523                        notifications.executor.clone(),
524                        notifications.provider.clone()
525                    )
526                    .backfill(1..=1)
527                    .next()
528                    .ok_or_eyre("failed to backfill")??
529                )
530            })
531        );
532
533        // Second notification is the actual notification that we sent before
534        assert_eq!(notifications.next().await.transpose()?, Some(notification));
535
536        Ok(())
537    }
538
539    #[tokio::test]
540    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
541        let temp_dir = tempfile::tempdir().unwrap();
542        let wal = Wal::new(temp_dir.path()).unwrap();
543
544        let provider_factory = create_test_provider_factory();
545        let genesis_hash = init_genesis(&provider_factory)?;
546        let genesis_block = provider_factory
547            .block(genesis_hash.into())?
548            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
549
550        let provider = BlockchainProvider2::new(provider_factory)?;
551
552        let node_head =
553            Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
554        let exex_head =
555            ExExHead { block: BlockNumHash { number: node_head.number, hash: node_head.hash } };
556
557        let notification = ExExNotification::ChainCommitted {
558            new: Arc::new(Chain::new(
559                vec![Block {
560                    header: Header {
561                        parent_hash: node_head.hash,
562                        number: node_head.number + 1,
563                        ..Default::default()
564                    },
565                    ..Default::default()
566                }
567                .seal_slow()
568                .seal_with_senders()
569                .ok_or_eyre("failed to recover senders")?],
570                Default::default(),
571                None,
572            )),
573        };
574
575        let (notifications_tx, notifications_rx) = mpsc::channel(1);
576
577        notifications_tx.send(notification.clone()).await?;
578
579        let mut notifications = ExExNotificationsWithoutHead::new(
580            node_head,
581            provider,
582            EthExecutorProvider::mainnet(),
583            notifications_rx,
584            wal.handle(),
585        )
586        .with_head(exex_head);
587
588        let new_notification = notifications.next().await.transpose()?;
589        assert_eq!(new_notification, Some(notification));
590
591        Ok(())
592    }
593
594    #[tokio::test(flavor = "multi_thread")]
595    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
596        let mut rng = generators::rng();
597
598        let temp_dir = tempfile::tempdir().unwrap();
599        let wal = Wal::new(temp_dir.path()).unwrap();
600
601        let provider_factory = create_test_provider_factory();
602        let genesis_hash = init_genesis(&provider_factory)?;
603        let genesis_block = provider_factory
604            .block(genesis_hash.into())?
605            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
606
607        let provider = BlockchainProvider2::new(provider_factory)?;
608
609        let node_head_block = random_block(
610            &mut rng,
611            genesis_block.number + 1,
612            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
613        )
614        .seal_with_senders::<reth_primitives::Block>()
615        .ok_or_eyre("failed to recover senders")?;
616        let node_head = Head {
617            number: node_head_block.number,
618            hash: node_head_block.hash(),
619            ..Default::default()
620        };
621        let provider_rw = provider.database_provider_rw()?;
622        provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
623        provider_rw.commit()?;
624        let node_head_notification = ExExNotification::ChainCommitted {
625            new: Arc::new(
626                BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
627                    .backfill(node_head.number..=node_head.number)
628                    .next()
629                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
630            ),
631        };
632
633        let exex_head_block = random_block(
634            &mut rng,
635            genesis_block.number + 1,
636            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
637        );
638        let exex_head = ExExHead { block: exex_head_block.num_hash() };
639        let exex_head_notification = ExExNotification::ChainCommitted {
640            new: Arc::new(Chain::new(
641                vec![exex_head_block
642                    .clone()
643                    .seal_with_senders()
644                    .ok_or_eyre("failed to recover senders")?],
645                Default::default(),
646                None,
647            )),
648        };
649        wal.commit(&exex_head_notification)?;
650
651        let new_notification = ExExNotification::ChainCommitted {
652            new: Arc::new(Chain::new(
653                vec![random_block(
654                    &mut rng,
655                    node_head.number + 1,
656                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
657                )
658                .seal_with_senders()
659                .ok_or_eyre("failed to recover senders")?],
660                Default::default(),
661                None,
662            )),
663        };
664
665        let (notifications_tx, notifications_rx) = mpsc::channel(1);
666
667        notifications_tx.send(new_notification.clone()).await?;
668
669        let mut notifications = ExExNotificationsWithoutHead::new(
670            node_head,
671            provider,
672            EthExecutorProvider::mainnet(),
673            notifications_rx,
674            wal.handle(),
675        )
676        .with_head(exex_head);
677
678        // First notification is the revert of the ExEx head block to get back to the canonical
679        // chain
680        assert_eq!(
681            notifications.next().await.transpose()?,
682            Some(exex_head_notification.into_inverted())
683        );
684        // Second notification is the backfilled block from the canonical chain to get back to the
685        // canonical tip
686        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
687        // Third notification is the actual notification that we sent before
688        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
689
690        Ok(())
691    }
692
693    #[tokio::test]
694    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
695        reth_tracing::init_test_tracing();
696        let mut rng = generators::rng();
697
698        let temp_dir = tempfile::tempdir().unwrap();
699        let wal = Wal::new(temp_dir.path()).unwrap();
700
701        let provider_factory = create_test_provider_factory();
702        let genesis_hash = init_genesis(&provider_factory)?;
703        let genesis_block = provider_factory
704            .block(genesis_hash.into())?
705            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
706
707        let provider = BlockchainProvider2::new(provider_factory)?;
708
709        let exex_head_block = random_block(
710            &mut rng,
711            genesis_block.number + 1,
712            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
713        );
714        let exex_head_notification = ExExNotification::ChainCommitted {
715            new: Arc::new(Chain::new(
716                vec![exex_head_block
717                    .clone()
718                    .seal_with_senders()
719                    .ok_or_eyre("failed to recover senders")?],
720                Default::default(),
721                None,
722            )),
723        };
724        wal.commit(&exex_head_notification)?;
725
726        let node_head =
727            Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
728        let exex_head = ExExHead {
729            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
730        };
731
732        let new_notification = ExExNotification::ChainCommitted {
733            new: Arc::new(Chain::new(
734                vec![random_block(
735                    &mut rng,
736                    genesis_block.number + 1,
737                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
738                )
739                .seal_with_senders()
740                .ok_or_eyre("failed to recover senders")?],
741                Default::default(),
742                None,
743            )),
744        };
745
746        let (notifications_tx, notifications_rx) = mpsc::channel(1);
747
748        notifications_tx.send(new_notification.clone()).await?;
749
750        let mut notifications = ExExNotificationsWithoutHead::new(
751            node_head,
752            provider,
753            EthExecutorProvider::mainnet(),
754            notifications_rx,
755            wal.handle(),
756        )
757        .with_head(exex_head);
758
759        // First notification is the revert of the ExEx head block to get back to the canonical
760        // chain
761        assert_eq!(
762            notifications.next().await.transpose()?,
763            Some(exex_head_notification.into_inverted())
764        );
765
766        // Second notification is the actual notification that we sent before
767        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
768
769        Ok(())
770    }
771}