reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12    fmt::Debug,
13    pin::Pin,
14    sync::Arc,
15    task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
20/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
21/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
22#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25    E: ConfigureEvm,
26{
27    inner: ExExNotificationsInner<P, E>,
28}
29
30/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
31/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
32/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
33pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
37    ///
38    /// It's a no-op if the stream has already been configured without a head.
39    ///
40    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
41    fn set_without_head(&mut self);
42
43    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
44    /// head.
45    ///
46    /// It's a no-op if the stream has already been configured with a head.
47    ///
48    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
49    fn set_with_head(&mut self, exex_head: ExExHead);
50
51    /// Returns a new [`ExExNotificationsStream`] without a head.
52    ///
53    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
54    fn without_head(self) -> Self
55    where
56        Self: Sized;
57
58    /// Returns a new [`ExExNotificationsStream`] with the provided head.
59    ///
60    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
61    fn with_head(self, exex_head: ExExHead) -> Self
62    where
63        Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69    E: ConfigureEvm,
70{
71    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
72    WithoutHead(ExExNotificationsWithoutHead<P, E>),
73    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
74    /// are committed or reverted after the given head.
75    WithHead(Box<ExExNotificationsWithHead<P, E>>),
76    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
77    /// [`ExExNotificationsInner::WithHead`].
78    Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83    E: ConfigureEvm,
84{
85    /// Creates a new stream of [`ExExNotifications`] without a head.
86    pub const fn new(
87        node_head: BlockNumHash,
88        provider: P,
89        evm_config: E,
90        notifications: Receiver<ExExNotification<E::Primitives>>,
91        wal_handle: WalHandle<E::Primitives>,
92    ) -> Self {
93        Self {
94            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95                node_head,
96                provider,
97                evm_config,
98                notifications,
99                wal_handle,
100            )),
101        }
102    }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
109{
110    fn set_without_head(&mut self) {
111        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
112        self.inner = ExExNotificationsInner::WithoutHead(match current {
113            ExExNotificationsInner::WithoutHead(notifications) => notifications,
114            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
115                notifications.initial_local_head,
116                notifications.provider,
117                notifications.evm_config,
118                notifications.notifications,
119                notifications.wal_handle,
120            ),
121            ExExNotificationsInner::Invalid => unreachable!(),
122        });
123    }
124
125    fn set_with_head(&mut self, exex_head: ExExHead) {
126        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
127        self.inner = ExExNotificationsInner::WithHead(match current {
128            ExExNotificationsInner::WithoutHead(notifications) => {
129                Box::new(notifications.with_head(exex_head))
130            }
131            ExExNotificationsInner::WithHead(notifications) => {
132                Box::new(ExExNotificationsWithHead::new(
133                    notifications.initial_local_head,
134                    notifications.provider,
135                    notifications.evm_config,
136                    notifications.notifications,
137                    notifications.wal_handle,
138                    exex_head,
139                ))
140            }
141            ExExNotificationsInner::Invalid => unreachable!(),
142        });
143    }
144
145    fn without_head(mut self) -> Self {
146        self.set_without_head();
147        self
148    }
149
150    fn with_head(mut self, exex_head: ExExHead) -> Self {
151        self.set_with_head(exex_head);
152        self
153    }
154}
155
156impl<P, E> Stream for ExExNotifications<P, E>
157where
158    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
159    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
160{
161    type Item = eyre::Result<ExExNotification<E::Primitives>>;
162
163    fn poll_next(
164        self: std::pin::Pin<&mut Self>,
165        cx: &mut std::task::Context<'_>,
166    ) -> std::task::Poll<Option<Self::Item>> {
167        match &mut self.get_mut().inner {
168            ExExNotificationsInner::WithoutHead(notifications) => {
169                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
170            }
171            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
172            ExExNotificationsInner::Invalid => unreachable!(),
173        }
174    }
175}
176
177/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
178pub struct ExExNotificationsWithoutHead<P, E>
179where
180    E: ConfigureEvm,
181{
182    node_head: BlockNumHash,
183    provider: P,
184    evm_config: E,
185    notifications: Receiver<ExExNotification<E::Primitives>>,
186    wal_handle: WalHandle<E::Primitives>,
187}
188
189impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
190where
191    E: ConfigureEvm + Debug,
192{
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("ExExNotifications")
195            .field("provider", &self.provider)
196            .field("evm_config", &self.evm_config)
197            .field("notifications", &self.notifications)
198            .finish()
199    }
200}
201
202impl<P, E> ExExNotificationsWithoutHead<P, E>
203where
204    E: ConfigureEvm,
205{
206    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
207    const fn new(
208        node_head: BlockNumHash,
209        provider: P,
210        evm_config: E,
211        notifications: Receiver<ExExNotification<E::Primitives>>,
212        wal_handle: WalHandle<E::Primitives>,
213    ) -> Self {
214        Self { node_head, provider, evm_config, notifications, wal_handle }
215    }
216
217    /// Subscribe to notifications with the given head.
218    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
219        ExExNotificationsWithHead::new(
220            self.node_head,
221            self.provider,
222            self.evm_config,
223            self.notifications,
224            self.wal_handle,
225            head,
226        )
227    }
228}
229
230impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
231where
232    E: ConfigureEvm,
233{
234    type Item = ExExNotification<E::Primitives>;
235
236    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237        self.get_mut().notifications.poll_recv(cx)
238    }
239}
240
241/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
242/// committed or reverted after the given head. The head is the ExEx's latest view of the host
243/// chain.
244///
245/// Notifications will be sent starting from the head, not inclusive. For example, if
246/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
247/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
248/// process block 11.
249#[derive(Debug)]
250pub struct ExExNotificationsWithHead<P, E>
251where
252    E: ConfigureEvm,
253{
254    /// The node's local head at launch.
255    initial_local_head: BlockNumHash,
256    provider: P,
257    evm_config: E,
258    notifications: Receiver<ExExNotification<E::Primitives>>,
259    wal_handle: WalHandle<E::Primitives>,
260    /// The exex head at launch
261    initial_exex_head: ExExHead,
262
263    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
264    /// revert its head.
265    pending_check_canonical: bool,
266    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
267    /// the missing blocks.
268    pending_check_backfill: bool,
269    /// The backfill job to run before consuming any notifications.
270    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
271}
272
273impl<P, E> ExExNotificationsWithHead<P, E>
274where
275    E: ConfigureEvm,
276{
277    /// Creates a new [`ExExNotificationsWithHead`].
278    const fn new(
279        node_head: BlockNumHash,
280        provider: P,
281        evm_config: E,
282        notifications: Receiver<ExExNotification<E::Primitives>>,
283        wal_handle: WalHandle<E::Primitives>,
284        exex_head: ExExHead,
285    ) -> Self {
286        Self {
287            initial_local_head: node_head,
288            provider,
289            evm_config,
290            notifications,
291            wal_handle,
292            initial_exex_head: exex_head,
293            pending_check_canonical: true,
294            pending_check_backfill: true,
295            backfill_job: None,
296        }
297    }
298}
299
300impl<P, E> ExExNotificationsWithHead<P, E>
301where
302    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
303    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
304{
305    /// Checks if the ExEx head is on the canonical chain.
306    ///
307    /// If the head block is not found in the database or it's ahead of the node head, it means
308    /// we're not on the canonical chain and we need to revert the notification with the ExEx
309    /// head block.
310    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
311        if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
312            self.initial_exex_head.block.number <= self.initial_local_head.number
313        {
314            // we have the targeted block and that block is below the current head
315            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
316            return Ok(None)
317        }
318
319        // If the head block is not found in the database, it means we're not on the canonical
320        // chain.
321
322        // Get the committed notification for the head block from the WAL.
323        let Some(notification) = self
324            .wal_handle
325            .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
326        else {
327            // it's possible that the exex head is further ahead
328            if self.initial_exex_head.block.number > self.initial_local_head.number {
329                debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
330                return Ok(None);
331            }
332
333            return Err(eyre::eyre!(
334                "Could not find notification for block hash {:?} in the WAL",
335                self.initial_exex_head.block.hash
336            ))
337        };
338
339        // Update the head block hash to the parent hash of the first committed block.
340        let committed_chain = notification.committed_chain().unwrap();
341        let new_exex_head =
342            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
343        debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
344        self.initial_exex_head.block = new_exex_head;
345
346        // Return an inverted notification. See the documentation for
347        // `ExExNotification::into_inverted`.
348        Ok(Some(notification.into_inverted()))
349    }
350
351    /// Compares the node head against the ExEx head, and backfills if needed.
352    ///
353    /// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the
354    /// canonical chain.
355    ///
356    /// Possible situations are:
357    /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
358    ///   node database.
359    /// - ExEx is at the same block number as the node head (`node_head.number ==
360    ///   exex_head.number`). Nothing to do.
361    fn check_backfill(&mut self) -> eyre::Result<()> {
362        let backfill_job_factory =
363            BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
364        match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
365            std::cmp::Ordering::Less => {
366                // ExEx is behind the node head, start backfill
367                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
368                let backfill = backfill_job_factory
369                    .backfill(
370                        self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
371                    )
372                    .into_stream();
373                self.backfill_job = Some(backfill);
374            }
375            std::cmp::Ordering::Equal => {
376                debug!(target: "exex::notifications", "ExEx is at the node head");
377            }
378            std::cmp::Ordering::Greater => {
379                debug!(target: "exex::notifications", "ExEx is ahead of the node head");
380            }
381        };
382
383        Ok(())
384    }
385}
386
387impl<P, E> Stream for ExExNotificationsWithHead<P, E>
388where
389    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
390    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
391{
392    type Item = eyre::Result<ExExNotification<E::Primitives>>;
393
394    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
395        let this = self.get_mut();
396
397        // 1. Check once whether we need to retrieve a notification gap from the WAL.
398        if this.pending_check_canonical {
399            if let Some(canonical_notification) = this.check_canonical()? {
400                return Poll::Ready(Some(Ok(canonical_notification)))
401            }
402
403            // ExEx head is on the canonical chain, we no longer need to check it
404            this.pending_check_canonical = false;
405        }
406
407        // 2. Check once whether we need to trigger backfill sync
408        if this.pending_check_backfill {
409            this.check_backfill()?;
410            this.pending_check_backfill = false;
411        }
412
413        // 3. If backfill is in progress yield new notifications
414        if let Some(backfill_job) = &mut this.backfill_job {
415            debug!(target: "exex::notifications", "Polling backfill job");
416            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
417                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
418                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
419                    new: Arc::new(chain),
420                })))
421            }
422
423            // Backfill job is done, remove it
424            this.backfill_job = None;
425        }
426
427        // 4. Otherwise advance the regular event stream
428        loop {
429            let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
430                return Poll::Ready(None)
431            };
432
433            // 5. In case the exex is ahead of the new tip, we must skip it
434            if let Some(committed) = notification.committed_chain() {
435                // inclusive check because we should start with `exex.head + 1`
436                if this.initial_exex_head.block.number >= committed.tip().number() {
437                    continue
438                }
439            }
440
441            return Poll::Ready(Some(Ok(notification)))
442        }
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use crate::Wal;
450    use alloy_consensus::Header;
451    use alloy_eips::BlockNumHash;
452    use eyre::OptionExt;
453    use futures::StreamExt;
454    use reth_db_common::init::init_genesis;
455    use reth_ethereum_primitives::Block;
456    use reth_evm_ethereum::execute::EthExecutorProvider;
457    use reth_primitives_traits::Block as _;
458    use reth_provider::{
459        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
460        Chain, DatabaseProviderFactory, StorageLocation,
461    };
462    use reth_testing_utils::generators::{self, random_block, BlockParams};
463    use tokio::sync::mpsc;
464
465    #[tokio::test(flavor = "multi_thread")]
466    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
467        let mut rng = generators::rng();
468
469        let temp_dir = tempfile::tempdir().unwrap();
470        let wal = Wal::new(temp_dir.path()).unwrap();
471
472        let provider_factory = create_test_provider_factory();
473        let genesis_hash = init_genesis(&provider_factory)?;
474        let genesis_block = provider_factory
475            .block(genesis_hash.into())?
476            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
477
478        let provider = BlockchainProvider::new(provider_factory.clone())?;
479
480        let node_head_block = random_block(
481            &mut rng,
482            genesis_block.number + 1,
483            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
484        );
485        let provider_rw = provider_factory.provider_rw()?;
486        provider_rw
487            .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?;
488        provider_rw.commit()?;
489
490        let node_head = node_head_block.num_hash();
491        let exex_head =
492            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
493
494        let notification = ExExNotification::ChainCommitted {
495            new: Arc::new(Chain::new(
496                vec![random_block(
497                    &mut rng,
498                    node_head.number + 1,
499                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
500                )
501                .try_recover()?],
502                Default::default(),
503                None,
504            )),
505        };
506
507        let (notifications_tx, notifications_rx) = mpsc::channel(1);
508
509        notifications_tx.send(notification.clone()).await?;
510
511        let mut notifications = ExExNotificationsWithoutHead::new(
512            node_head,
513            provider,
514            EthExecutorProvider::mainnet(),
515            notifications_rx,
516            wal.handle(),
517        )
518        .with_head(exex_head);
519
520        // First notification is the backfill of missing blocks from the canonical chain
521        assert_eq!(
522            notifications.next().await.transpose()?,
523            Some(ExExNotification::ChainCommitted {
524                new: Arc::new(
525                    BackfillJobFactory::new(
526                        notifications.evm_config.clone(),
527                        notifications.provider.clone()
528                    )
529                    .backfill(1..=1)
530                    .next()
531                    .ok_or_eyre("failed to backfill")??
532                )
533            })
534        );
535
536        // Second notification is the actual notification that we sent before
537        assert_eq!(notifications.next().await.transpose()?, Some(notification));
538
539        Ok(())
540    }
541
542    #[tokio::test]
543    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
544        let temp_dir = tempfile::tempdir().unwrap();
545        let wal = Wal::new(temp_dir.path()).unwrap();
546
547        let provider_factory = create_test_provider_factory();
548        let genesis_hash = init_genesis(&provider_factory)?;
549        let genesis_block = provider_factory
550            .block(genesis_hash.into())?
551            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
552
553        let provider = BlockchainProvider::new(provider_factory)?;
554
555        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
556        let exex_head = ExExHead { block: node_head };
557
558        let notification = ExExNotification::ChainCommitted {
559            new: Arc::new(Chain::new(
560                vec![Block {
561                    header: Header {
562                        parent_hash: node_head.hash,
563                        number: node_head.number + 1,
564                        ..Default::default()
565                    },
566                    ..Default::default()
567                }
568                .seal_slow()
569                .try_recover()?],
570                Default::default(),
571                None,
572            )),
573        };
574
575        let (notifications_tx, notifications_rx) = mpsc::channel(1);
576
577        notifications_tx.send(notification.clone()).await?;
578
579        let mut notifications = ExExNotificationsWithoutHead::new(
580            node_head,
581            provider,
582            EthExecutorProvider::mainnet(),
583            notifications_rx,
584            wal.handle(),
585        )
586        .with_head(exex_head);
587
588        let new_notification = notifications.next().await.transpose()?;
589        assert_eq!(new_notification, Some(notification));
590
591        Ok(())
592    }
593
594    #[tokio::test(flavor = "multi_thread")]
595    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
596        let mut rng = generators::rng();
597
598        let temp_dir = tempfile::tempdir().unwrap();
599        let wal = Wal::new(temp_dir.path()).unwrap();
600
601        let provider_factory = create_test_provider_factory();
602        let genesis_hash = init_genesis(&provider_factory)?;
603        let genesis_block = provider_factory
604            .block(genesis_hash.into())?
605            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
606
607        let provider = BlockchainProvider::new(provider_factory)?;
608
609        let node_head_block = random_block(
610            &mut rng,
611            genesis_block.number + 1,
612            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
613        )
614        .try_recover()?;
615        let node_head = node_head_block.num_hash();
616        let provider_rw = provider.database_provider_rw()?;
617        provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
618        provider_rw.commit()?;
619        let node_head_notification = ExExNotification::ChainCommitted {
620            new: Arc::new(
621                BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
622                    .backfill(node_head.number..=node_head.number)
623                    .next()
624                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
625            ),
626        };
627
628        let exex_head_block = random_block(
629            &mut rng,
630            genesis_block.number + 1,
631            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
632        );
633        let exex_head = ExExHead { block: exex_head_block.num_hash() };
634        let exex_head_notification = ExExNotification::ChainCommitted {
635            new: Arc::new(Chain::new(
636                vec![exex_head_block.clone().try_recover()?],
637                Default::default(),
638                None,
639            )),
640        };
641        wal.commit(&exex_head_notification)?;
642
643        let new_notification = ExExNotification::ChainCommitted {
644            new: Arc::new(Chain::new(
645                vec![random_block(
646                    &mut rng,
647                    node_head.number + 1,
648                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
649                )
650                .try_recover()?],
651                Default::default(),
652                None,
653            )),
654        };
655
656        let (notifications_tx, notifications_rx) = mpsc::channel(1);
657
658        notifications_tx.send(new_notification.clone()).await?;
659
660        let mut notifications = ExExNotificationsWithoutHead::new(
661            node_head,
662            provider,
663            EthExecutorProvider::mainnet(),
664            notifications_rx,
665            wal.handle(),
666        )
667        .with_head(exex_head);
668
669        // First notification is the revert of the ExEx head block to get back to the canonical
670        // chain
671        assert_eq!(
672            notifications.next().await.transpose()?,
673            Some(exex_head_notification.into_inverted())
674        );
675        // Second notification is the backfilled block from the canonical chain to get back to the
676        // canonical tip
677        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
678        // Third notification is the actual notification that we sent before
679        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
680
681        Ok(())
682    }
683
684    #[tokio::test]
685    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
686        reth_tracing::init_test_tracing();
687        let mut rng = generators::rng();
688
689        let temp_dir = tempfile::tempdir().unwrap();
690        let wal = Wal::new(temp_dir.path()).unwrap();
691
692        let provider_factory = create_test_provider_factory();
693        let genesis_hash = init_genesis(&provider_factory)?;
694        let genesis_block = provider_factory
695            .block(genesis_hash.into())?
696            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
697
698        let provider = BlockchainProvider::new(provider_factory)?;
699
700        let exex_head_block = random_block(
701            &mut rng,
702            genesis_block.number + 1,
703            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
704        );
705        let exex_head_notification = ExExNotification::ChainCommitted {
706            new: Arc::new(Chain::new(
707                vec![exex_head_block.clone().try_recover()?],
708                Default::default(),
709                None,
710            )),
711        };
712        wal.commit(&exex_head_notification)?;
713
714        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
715        let exex_head = ExExHead {
716            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
717        };
718
719        let new_notification = ExExNotification::ChainCommitted {
720            new: Arc::new(Chain::new(
721                vec![random_block(
722                    &mut rng,
723                    genesis_block.number + 1,
724                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
725                )
726                .try_recover()?],
727                Default::default(),
728                None,
729            )),
730        };
731
732        let (notifications_tx, notifications_rx) = mpsc::channel(1);
733
734        notifications_tx.send(new_notification.clone()).await?;
735
736        let mut notifications = ExExNotificationsWithoutHead::new(
737            node_head,
738            provider,
739            EthExecutorProvider::mainnet(),
740            notifications_rx,
741            wal.handle(),
742        )
743        .with_head(exex_head);
744
745        // First notification is the revert of the ExEx head block to get back to the canonical
746        // chain
747        assert_eq!(
748            notifications.next().await.transpose()?,
749            Some(exex_head_notification.into_inverted())
750        );
751
752        // Second notification is the actual notification that we sent before
753        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
754
755        Ok(())
756    }
757}