1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12 fmt::Debug,
13 pin::Pin,
14 sync::Arc,
15 task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25 E: ConfigureEvm,
26{
27 inner: ExExNotificationsInner<P, E>,
28}
29
30pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36 fn set_without_head(&mut self);
42
43 fn set_with_head(&mut self, exex_head: ExExHead);
50
51 fn without_head(self) -> Self
55 where
56 Self: Sized;
57
58 fn with_head(self, exex_head: ExExHead) -> Self
62 where
63 Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69 E: ConfigureEvm,
70{
71 WithoutHead(ExExNotificationsWithoutHead<P, E>),
73 WithHead(Box<ExExNotificationsWithHead<P, E>>),
76 Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83 E: ConfigureEvm,
84{
85 pub const fn new(
87 node_head: BlockNumHash,
88 provider: P,
89 evm_config: E,
90 notifications: Receiver<ExExNotification<E::Primitives>>,
91 wal_handle: WalHandle<E::Primitives>,
92 ) -> Self {
93 Self {
94 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95 node_head,
96 provider,
97 evm_config,
98 notifications,
99 wal_handle,
100 )),
101 }
102 }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
109{
110 fn set_without_head(&mut self) {
111 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
112 self.inner = ExExNotificationsInner::WithoutHead(match current {
113 ExExNotificationsInner::WithoutHead(notifications) => notifications,
114 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
115 notifications.initial_local_head,
116 notifications.provider,
117 notifications.evm_config,
118 notifications.notifications,
119 notifications.wal_handle,
120 ),
121 ExExNotificationsInner::Invalid => unreachable!(),
122 });
123 }
124
125 fn set_with_head(&mut self, exex_head: ExExHead) {
126 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
127 self.inner = ExExNotificationsInner::WithHead(match current {
128 ExExNotificationsInner::WithoutHead(notifications) => {
129 Box::new(notifications.with_head(exex_head))
130 }
131 ExExNotificationsInner::WithHead(notifications) => {
132 Box::new(ExExNotificationsWithHead::new(
133 notifications.initial_local_head,
134 notifications.provider,
135 notifications.evm_config,
136 notifications.notifications,
137 notifications.wal_handle,
138 exex_head,
139 ))
140 }
141 ExExNotificationsInner::Invalid => unreachable!(),
142 });
143 }
144
145 fn without_head(mut self) -> Self {
146 self.set_without_head();
147 self
148 }
149
150 fn with_head(mut self, exex_head: ExExHead) -> Self {
151 self.set_with_head(exex_head);
152 self
153 }
154}
155
156impl<P, E> Stream for ExExNotifications<P, E>
157where
158 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
159 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
160{
161 type Item = eyre::Result<ExExNotification<E::Primitives>>;
162
163 fn poll_next(
164 self: std::pin::Pin<&mut Self>,
165 cx: &mut std::task::Context<'_>,
166 ) -> std::task::Poll<Option<Self::Item>> {
167 match &mut self.get_mut().inner {
168 ExExNotificationsInner::WithoutHead(notifications) => {
169 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
170 }
171 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
172 ExExNotificationsInner::Invalid => unreachable!(),
173 }
174 }
175}
176
177pub struct ExExNotificationsWithoutHead<P, E>
179where
180 E: ConfigureEvm,
181{
182 node_head: BlockNumHash,
183 provider: P,
184 evm_config: E,
185 notifications: Receiver<ExExNotification<E::Primitives>>,
186 wal_handle: WalHandle<E::Primitives>,
187}
188
189impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
190where
191 E: ConfigureEvm + Debug,
192{
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("ExExNotifications")
195 .field("provider", &self.provider)
196 .field("evm_config", &self.evm_config)
197 .field("notifications", &self.notifications)
198 .finish()
199 }
200}
201
202impl<P, E> ExExNotificationsWithoutHead<P, E>
203where
204 E: ConfigureEvm,
205{
206 const fn new(
208 node_head: BlockNumHash,
209 provider: P,
210 evm_config: E,
211 notifications: Receiver<ExExNotification<E::Primitives>>,
212 wal_handle: WalHandle<E::Primitives>,
213 ) -> Self {
214 Self { node_head, provider, evm_config, notifications, wal_handle }
215 }
216
217 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
219 ExExNotificationsWithHead::new(
220 self.node_head,
221 self.provider,
222 self.evm_config,
223 self.notifications,
224 self.wal_handle,
225 head,
226 )
227 }
228}
229
230impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
231where
232 E: ConfigureEvm,
233{
234 type Item = ExExNotification<E::Primitives>;
235
236 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237 self.get_mut().notifications.poll_recv(cx)
238 }
239}
240
241#[derive(Debug)]
250pub struct ExExNotificationsWithHead<P, E>
251where
252 E: ConfigureEvm,
253{
254 initial_local_head: BlockNumHash,
256 provider: P,
257 evm_config: E,
258 notifications: Receiver<ExExNotification<E::Primitives>>,
259 wal_handle: WalHandle<E::Primitives>,
260 initial_exex_head: ExExHead,
262
263 pending_check_canonical: bool,
266 pending_check_backfill: bool,
269 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
271}
272
273impl<P, E> ExExNotificationsWithHead<P, E>
274where
275 E: ConfigureEvm,
276{
277 const fn new(
279 node_head: BlockNumHash,
280 provider: P,
281 evm_config: E,
282 notifications: Receiver<ExExNotification<E::Primitives>>,
283 wal_handle: WalHandle<E::Primitives>,
284 exex_head: ExExHead,
285 ) -> Self {
286 Self {
287 initial_local_head: node_head,
288 provider,
289 evm_config,
290 notifications,
291 wal_handle,
292 initial_exex_head: exex_head,
293 pending_check_canonical: true,
294 pending_check_backfill: true,
295 backfill_job: None,
296 }
297 }
298}
299
300impl<P, E> ExExNotificationsWithHead<P, E>
301where
302 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
303 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
304{
305 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
311 if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
312 self.initial_exex_head.block.number <= self.initial_local_head.number
313 {
314 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
316 return Ok(None)
317 }
318
319 let Some(notification) = self
324 .wal_handle
325 .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
326 else {
327 if self.initial_exex_head.block.number > self.initial_local_head.number {
329 debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
330 return Ok(None);
331 }
332
333 return Err(eyre::eyre!(
334 "Could not find notification for block hash {:?} in the WAL",
335 self.initial_exex_head.block.hash
336 ))
337 };
338
339 let committed_chain = notification.committed_chain().unwrap();
341 let new_exex_head =
342 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
343 debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
344 self.initial_exex_head.block = new_exex_head;
345
346 Ok(Some(notification.into_inverted()))
349 }
350
351 fn check_backfill(&mut self) -> eyre::Result<()> {
362 let backfill_job_factory =
363 BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
364 match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
365 std::cmp::Ordering::Less => {
366 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
368 let backfill = backfill_job_factory
369 .backfill(
370 self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
371 )
372 .into_stream();
373 self.backfill_job = Some(backfill);
374 }
375 std::cmp::Ordering::Equal => {
376 debug!(target: "exex::notifications", "ExEx is at the node head");
377 }
378 std::cmp::Ordering::Greater => {
379 debug!(target: "exex::notifications", "ExEx is ahead of the node head");
380 }
381 };
382
383 Ok(())
384 }
385}
386
387impl<P, E> Stream for ExExNotificationsWithHead<P, E>
388where
389 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
390 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
391{
392 type Item = eyre::Result<ExExNotification<E::Primitives>>;
393
394 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
395 let this = self.get_mut();
396
397 if this.pending_check_canonical {
399 if let Some(canonical_notification) = this.check_canonical()? {
400 return Poll::Ready(Some(Ok(canonical_notification)))
401 }
402
403 this.pending_check_canonical = false;
405 }
406
407 if this.pending_check_backfill {
409 this.check_backfill()?;
410 this.pending_check_backfill = false;
411 }
412
413 if let Some(backfill_job) = &mut this.backfill_job {
415 debug!(target: "exex::notifications", "Polling backfill job");
416 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
417 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
418 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
419 new: Arc::new(chain),
420 })))
421 }
422
423 this.backfill_job = None;
425 }
426
427 loop {
429 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
430 return Poll::Ready(None)
431 };
432
433 if let Some(committed) = notification.committed_chain() {
435 if this.initial_exex_head.block.number >= committed.tip().number() {
437 continue
438 }
439 }
440
441 return Poll::Ready(Some(Ok(notification)))
442 }
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use crate::Wal;
450 use alloy_consensus::Header;
451 use alloy_eips::BlockNumHash;
452 use eyre::OptionExt;
453 use futures::StreamExt;
454 use reth_db_common::init::init_genesis;
455 use reth_ethereum_primitives::Block;
456 use reth_evm_ethereum::execute::EthExecutorProvider;
457 use reth_primitives_traits::Block as _;
458 use reth_provider::{
459 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
460 Chain, DatabaseProviderFactory, StorageLocation,
461 };
462 use reth_testing_utils::generators::{self, random_block, BlockParams};
463 use tokio::sync::mpsc;
464
465 #[tokio::test(flavor = "multi_thread")]
466 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
467 let mut rng = generators::rng();
468
469 let temp_dir = tempfile::tempdir().unwrap();
470 let wal = Wal::new(temp_dir.path()).unwrap();
471
472 let provider_factory = create_test_provider_factory();
473 let genesis_hash = init_genesis(&provider_factory)?;
474 let genesis_block = provider_factory
475 .block(genesis_hash.into())?
476 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
477
478 let provider = BlockchainProvider::new(provider_factory.clone())?;
479
480 let node_head_block = random_block(
481 &mut rng,
482 genesis_block.number + 1,
483 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
484 );
485 let provider_rw = provider_factory.provider_rw()?;
486 provider_rw
487 .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?;
488 provider_rw.commit()?;
489
490 let node_head = node_head_block.num_hash();
491 let exex_head =
492 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
493
494 let notification = ExExNotification::ChainCommitted {
495 new: Arc::new(Chain::new(
496 vec![random_block(
497 &mut rng,
498 node_head.number + 1,
499 BlockParams { parent: Some(node_head.hash), ..Default::default() },
500 )
501 .try_recover()?],
502 Default::default(),
503 None,
504 )),
505 };
506
507 let (notifications_tx, notifications_rx) = mpsc::channel(1);
508
509 notifications_tx.send(notification.clone()).await?;
510
511 let mut notifications = ExExNotificationsWithoutHead::new(
512 node_head,
513 provider,
514 EthExecutorProvider::mainnet(),
515 notifications_rx,
516 wal.handle(),
517 )
518 .with_head(exex_head);
519
520 assert_eq!(
522 notifications.next().await.transpose()?,
523 Some(ExExNotification::ChainCommitted {
524 new: Arc::new(
525 BackfillJobFactory::new(
526 notifications.evm_config.clone(),
527 notifications.provider.clone()
528 )
529 .backfill(1..=1)
530 .next()
531 .ok_or_eyre("failed to backfill")??
532 )
533 })
534 );
535
536 assert_eq!(notifications.next().await.transpose()?, Some(notification));
538
539 Ok(())
540 }
541
542 #[tokio::test]
543 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
544 let temp_dir = tempfile::tempdir().unwrap();
545 let wal = Wal::new(temp_dir.path()).unwrap();
546
547 let provider_factory = create_test_provider_factory();
548 let genesis_hash = init_genesis(&provider_factory)?;
549 let genesis_block = provider_factory
550 .block(genesis_hash.into())?
551 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
552
553 let provider = BlockchainProvider::new(provider_factory)?;
554
555 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
556 let exex_head = ExExHead { block: node_head };
557
558 let notification = ExExNotification::ChainCommitted {
559 new: Arc::new(Chain::new(
560 vec![Block {
561 header: Header {
562 parent_hash: node_head.hash,
563 number: node_head.number + 1,
564 ..Default::default()
565 },
566 ..Default::default()
567 }
568 .seal_slow()
569 .try_recover()?],
570 Default::default(),
571 None,
572 )),
573 };
574
575 let (notifications_tx, notifications_rx) = mpsc::channel(1);
576
577 notifications_tx.send(notification.clone()).await?;
578
579 let mut notifications = ExExNotificationsWithoutHead::new(
580 node_head,
581 provider,
582 EthExecutorProvider::mainnet(),
583 notifications_rx,
584 wal.handle(),
585 )
586 .with_head(exex_head);
587
588 let new_notification = notifications.next().await.transpose()?;
589 assert_eq!(new_notification, Some(notification));
590
591 Ok(())
592 }
593
594 #[tokio::test(flavor = "multi_thread")]
595 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
596 let mut rng = generators::rng();
597
598 let temp_dir = tempfile::tempdir().unwrap();
599 let wal = Wal::new(temp_dir.path()).unwrap();
600
601 let provider_factory = create_test_provider_factory();
602 let genesis_hash = init_genesis(&provider_factory)?;
603 let genesis_block = provider_factory
604 .block(genesis_hash.into())?
605 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
606
607 let provider = BlockchainProvider::new(provider_factory)?;
608
609 let node_head_block = random_block(
610 &mut rng,
611 genesis_block.number + 1,
612 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
613 )
614 .try_recover()?;
615 let node_head = node_head_block.num_hash();
616 let provider_rw = provider.database_provider_rw()?;
617 provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
618 provider_rw.commit()?;
619 let node_head_notification = ExExNotification::ChainCommitted {
620 new: Arc::new(
621 BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
622 .backfill(node_head.number..=node_head.number)
623 .next()
624 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
625 ),
626 };
627
628 let exex_head_block = random_block(
629 &mut rng,
630 genesis_block.number + 1,
631 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
632 );
633 let exex_head = ExExHead { block: exex_head_block.num_hash() };
634 let exex_head_notification = ExExNotification::ChainCommitted {
635 new: Arc::new(Chain::new(
636 vec![exex_head_block.clone().try_recover()?],
637 Default::default(),
638 None,
639 )),
640 };
641 wal.commit(&exex_head_notification)?;
642
643 let new_notification = ExExNotification::ChainCommitted {
644 new: Arc::new(Chain::new(
645 vec![random_block(
646 &mut rng,
647 node_head.number + 1,
648 BlockParams { parent: Some(node_head.hash), ..Default::default() },
649 )
650 .try_recover()?],
651 Default::default(),
652 None,
653 )),
654 };
655
656 let (notifications_tx, notifications_rx) = mpsc::channel(1);
657
658 notifications_tx.send(new_notification.clone()).await?;
659
660 let mut notifications = ExExNotificationsWithoutHead::new(
661 node_head,
662 provider,
663 EthExecutorProvider::mainnet(),
664 notifications_rx,
665 wal.handle(),
666 )
667 .with_head(exex_head);
668
669 assert_eq!(
672 notifications.next().await.transpose()?,
673 Some(exex_head_notification.into_inverted())
674 );
675 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
678 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
680
681 Ok(())
682 }
683
684 #[tokio::test]
685 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
686 reth_tracing::init_test_tracing();
687 let mut rng = generators::rng();
688
689 let temp_dir = tempfile::tempdir().unwrap();
690 let wal = Wal::new(temp_dir.path()).unwrap();
691
692 let provider_factory = create_test_provider_factory();
693 let genesis_hash = init_genesis(&provider_factory)?;
694 let genesis_block = provider_factory
695 .block(genesis_hash.into())?
696 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
697
698 let provider = BlockchainProvider::new(provider_factory)?;
699
700 let exex_head_block = random_block(
701 &mut rng,
702 genesis_block.number + 1,
703 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
704 );
705 let exex_head_notification = ExExNotification::ChainCommitted {
706 new: Arc::new(Chain::new(
707 vec![exex_head_block.clone().try_recover()?],
708 Default::default(),
709 None,
710 )),
711 };
712 wal.commit(&exex_head_notification)?;
713
714 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
715 let exex_head = ExExHead {
716 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
717 };
718
719 let new_notification = ExExNotification::ChainCommitted {
720 new: Arc::new(Chain::new(
721 vec![random_block(
722 &mut rng,
723 genesis_block.number + 1,
724 BlockParams { parent: Some(genesis_hash), ..Default::default() },
725 )
726 .try_recover()?],
727 Default::default(),
728 None,
729 )),
730 };
731
732 let (notifications_tx, notifications_rx) = mpsc::channel(1);
733
734 notifications_tx.send(new_notification.clone()).await?;
735
736 let mut notifications = ExExNotificationsWithoutHead::new(
737 node_head,
738 provider,
739 EthExecutorProvider::mainnet(),
740 notifications_rx,
741 wal.handle(),
742 )
743 .with_head(exex_head);
744
745 assert_eq!(
748 notifications.next().await.transpose()?,
749 Some(exex_head_notification.into_inverted())
750 );
751
752 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
754
755 Ok(())
756 }
757}