1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use futures::{Stream, StreamExt};
4use reth_chainspec::Head;
5use reth_evm::execute::BlockExecutorProvider;
6use reth_exex_types::ExExHead;
7use reth_node_api::NodePrimitives;
8use reth_primitives::EthPrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12 fmt::Debug,
13 pin::Pin,
14 sync::Arc,
15 task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25 E: BlockExecutorProvider,
26{
27 inner: ExExNotificationsInner<P, E>,
28}
29
30pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36 fn set_without_head(&mut self);
42
43 fn set_with_head(&mut self, exex_head: ExExHead);
50
51 fn without_head(self) -> Self
55 where
56 Self: Sized;
57
58 fn with_head(self, exex_head: ExExHead) -> Self
62 where
63 Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69 E: BlockExecutorProvider,
70{
71 WithoutHead(ExExNotificationsWithoutHead<P, E>),
73 WithHead(ExExNotificationsWithHead<P, E>),
76 Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83 E: BlockExecutorProvider,
84{
85 pub const fn new(
87 node_head: Head,
88 provider: P,
89 executor: E,
90 notifications: Receiver<ExExNotification<E::Primitives>>,
91 wal_handle: WalHandle<E::Primitives>,
92 ) -> Self {
93 Self {
94 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95 node_head,
96 provider,
97 executor,
98 notifications,
99 wal_handle,
100 )),
101 }
102 }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
109 + Clone
110 + Unpin
111 + 'static,
112{
113 fn set_without_head(&mut self) {
114 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
115 self.inner = ExExNotificationsInner::WithoutHead(match current {
116 ExExNotificationsInner::WithoutHead(notifications) => notifications,
117 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
118 notifications.node_head,
119 notifications.provider,
120 notifications.executor,
121 notifications.notifications,
122 notifications.wal_handle,
123 ),
124 ExExNotificationsInner::Invalid => unreachable!(),
125 });
126 }
127
128 fn set_with_head(&mut self, exex_head: ExExHead) {
129 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
130 self.inner = ExExNotificationsInner::WithHead(match current {
131 ExExNotificationsInner::WithoutHead(notifications) => {
132 notifications.with_head(exex_head)
133 }
134 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new(
135 notifications.node_head,
136 notifications.provider,
137 notifications.executor,
138 notifications.notifications,
139 notifications.wal_handle,
140 exex_head,
141 ),
142 ExExNotificationsInner::Invalid => unreachable!(),
143 });
144 }
145
146 fn without_head(mut self) -> Self {
147 self.set_without_head();
148 self
149 }
150
151 fn with_head(mut self, exex_head: ExExHead) -> Self {
152 self.set_with_head(exex_head);
153 self
154 }
155}
156
157impl<P, E> Stream for ExExNotifications<P, E>
158where
159 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
160 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
161 + Clone
162 + Unpin
163 + 'static,
164{
165 type Item = eyre::Result<ExExNotification<E::Primitives>>;
166
167 fn poll_next(
168 self: std::pin::Pin<&mut Self>,
169 cx: &mut std::task::Context<'_>,
170 ) -> std::task::Poll<Option<Self::Item>> {
171 match &mut self.get_mut().inner {
172 ExExNotificationsInner::WithoutHead(notifications) => {
173 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
174 }
175 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
176 ExExNotificationsInner::Invalid => unreachable!(),
177 }
178 }
179}
180
181pub struct ExExNotificationsWithoutHead<P, E>
183where
184 E: BlockExecutorProvider,
185{
186 node_head: Head,
187 provider: P,
188 executor: E,
189 notifications: Receiver<ExExNotification<E::Primitives>>,
190 wal_handle: WalHandle<E::Primitives>,
191}
192
193impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
194where
195 E: Debug + BlockExecutorProvider,
196{
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.debug_struct("ExExNotifications")
199 .field("provider", &self.provider)
200 .field("executor", &self.executor)
201 .field("notifications", &self.notifications)
202 .finish()
203 }
204}
205
206impl<P, E> ExExNotificationsWithoutHead<P, E>
207where
208 E: BlockExecutorProvider,
209{
210 const fn new(
212 node_head: Head,
213 provider: P,
214 executor: E,
215 notifications: Receiver<ExExNotification<E::Primitives>>,
216 wal_handle: WalHandle<E::Primitives>,
217 ) -> Self {
218 Self { node_head, provider, executor, notifications, wal_handle }
219 }
220
221 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
223 ExExNotificationsWithHead::new(
224 self.node_head,
225 self.provider,
226 self.executor,
227 self.notifications,
228 self.wal_handle,
229 head,
230 )
231 }
232}
233
234impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
235where
236 E: Unpin + BlockExecutorProvider,
237{
238 type Item = ExExNotification<E::Primitives>;
239
240 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
241 self.get_mut().notifications.poll_recv(cx)
242 }
243}
244
245#[derive(Debug)]
254pub struct ExExNotificationsWithHead<P, E>
255where
256 E: BlockExecutorProvider,
257{
258 node_head: Head,
259 provider: P,
260 executor: E,
261 notifications: Receiver<ExExNotification<E::Primitives>>,
262 wal_handle: WalHandle<E::Primitives>,
263 exex_head: ExExHead,
264 pending_check_canonical: bool,
267 pending_check_backfill: bool,
270 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
272}
273
274impl<P, E> ExExNotificationsWithHead<P, E>
275where
276 E: BlockExecutorProvider,
277{
278 const fn new(
280 node_head: Head,
281 provider: P,
282 executor: E,
283 notifications: Receiver<ExExNotification<E::Primitives>>,
284 wal_handle: WalHandle<E::Primitives>,
285 exex_head: ExExHead,
286 ) -> Self {
287 Self {
288 node_head,
289 provider,
290 executor,
291 notifications,
292 wal_handle,
293 exex_head,
294 pending_check_canonical: true,
295 pending_check_backfill: true,
296 backfill_job: None,
297 }
298 }
299}
300
301impl<P, E> ExExNotificationsWithHead<P, E>
302where
303 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
304 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
305 + Clone
306 + Unpin
307 + 'static,
308{
309 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
315 if self.provider.is_known(&self.exex_head.block.hash)? &&
316 self.exex_head.block.number <= self.node_head.number
317 {
318 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
319 return Ok(None)
320 }
321
322 let Some(notification) =
327 self.wal_handle.get_committed_notification_by_block_hash(&self.exex_head.block.hash)?
328 else {
329 return Err(eyre::eyre!(
330 "Could not find notification for block hash {:?} in the WAL",
331 self.exex_head.block.hash
332 ))
333 };
334
335 let committed_chain = notification.committed_chain().unwrap();
337 let new_exex_head =
338 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
339 debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
340 self.exex_head.block = new_exex_head;
341
342 Ok(Some(notification.into_inverted()))
345 }
346
347 fn check_backfill(&mut self) -> eyre::Result<()> {
358 let backfill_job_factory =
359 BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
360 match self.exex_head.block.number.cmp(&self.node_head.number) {
361 std::cmp::Ordering::Less => {
362 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
364 let backfill = backfill_job_factory
365 .backfill(self.exex_head.block.number + 1..=self.node_head.number)
366 .into_stream();
367 self.backfill_job = Some(backfill);
368 }
369 std::cmp::Ordering::Equal => {
370 debug!(target: "exex::notifications", "ExEx is at the node head");
371 }
372 std::cmp::Ordering::Greater => {
373 return Err(eyre::eyre!("ExEx is ahead of the node head"))
374 }
375 };
376
377 Ok(())
378 }
379}
380
381impl<P, E> Stream for ExExNotificationsWithHead<P, E>
382where
383 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
384 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
385 + Clone
386 + Unpin
387 + 'static,
388{
389 type Item = eyre::Result<ExExNotification<E::Primitives>>;
390
391 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
392 let this = self.get_mut();
393
394 if this.pending_check_canonical {
395 if let Some(canonical_notification) = this.check_canonical()? {
396 return Poll::Ready(Some(Ok(canonical_notification)))
397 }
398
399 this.pending_check_canonical = false;
401 }
402
403 if this.pending_check_backfill {
404 this.check_backfill()?;
405 this.pending_check_backfill = false;
406 }
407
408 if let Some(backfill_job) = &mut this.backfill_job {
409 debug!(target: "exex::notifications", "Polling backfill job");
410 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
411 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
412 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
413 new: Arc::new(chain),
414 })))
415 }
416
417 this.backfill_job = None;
419 }
420
421 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
422 return Poll::Ready(None)
423 };
424
425 if let Some(committed_chain) = notification.committed_chain() {
426 this.exex_head.block = committed_chain.tip().num_hash();
427 } else if let Some(reverted_chain) = notification.reverted_chain() {
428 let first_block = reverted_chain.first();
429 this.exex_head.block = (first_block.parent_hash(), first_block.number() - 1).into();
430 }
431
432 Poll::Ready(Some(Ok(notification)))
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use crate::Wal;
439
440 use super::*;
441 use alloy_consensus::Header;
442 use alloy_eips::BlockNumHash;
443 use eyre::OptionExt;
444 use futures::StreamExt;
445 use reth_db_common::init::init_genesis;
446 use reth_evm_ethereum::execute::EthExecutorProvider;
447 use reth_primitives::{Block, BlockExt};
448 use reth_provider::{
449 providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter,
450 Chain, DatabaseProviderFactory, StorageLocation,
451 };
452 use reth_testing_utils::generators::{self, random_block, BlockParams};
453 use tokio::sync::mpsc;
454
455 #[tokio::test(flavor = "multi_thread")]
456 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
457 let mut rng = generators::rng();
458
459 let temp_dir = tempfile::tempdir().unwrap();
460 let wal = Wal::new(temp_dir.path()).unwrap();
461
462 let provider_factory = create_test_provider_factory();
463 let genesis_hash = init_genesis(&provider_factory)?;
464 let genesis_block = provider_factory
465 .block(genesis_hash.into())?
466 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
467
468 let provider = BlockchainProvider2::new(provider_factory.clone())?;
469
470 let node_head_block = random_block(
471 &mut rng,
472 genesis_block.number + 1,
473 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
474 );
475 let provider_rw = provider_factory.provider_rw()?;
476 provider_rw.insert_block(
477 node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?,
478 StorageLocation::Database,
479 )?;
480 provider_rw.commit()?;
481
482 let node_head = Head {
483 number: node_head_block.number,
484 hash: node_head_block.hash(),
485 ..Default::default()
486 };
487 let exex_head =
488 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
489
490 let notification = ExExNotification::ChainCommitted {
491 new: Arc::new(Chain::new(
492 vec![random_block(
493 &mut rng,
494 node_head.number + 1,
495 BlockParams { parent: Some(node_head.hash), ..Default::default() },
496 )
497 .seal_with_senders()
498 .ok_or_eyre("failed to recover senders")?],
499 Default::default(),
500 None,
501 )),
502 };
503
504 let (notifications_tx, notifications_rx) = mpsc::channel(1);
505
506 notifications_tx.send(notification.clone()).await?;
507
508 let mut notifications = ExExNotificationsWithoutHead::new(
509 node_head,
510 provider,
511 EthExecutorProvider::mainnet(),
512 notifications_rx,
513 wal.handle(),
514 )
515 .with_head(exex_head);
516
517 assert_eq!(
519 notifications.next().await.transpose()?,
520 Some(ExExNotification::ChainCommitted {
521 new: Arc::new(
522 BackfillJobFactory::new(
523 notifications.executor.clone(),
524 notifications.provider.clone()
525 )
526 .backfill(1..=1)
527 .next()
528 .ok_or_eyre("failed to backfill")??
529 )
530 })
531 );
532
533 assert_eq!(notifications.next().await.transpose()?, Some(notification));
535
536 Ok(())
537 }
538
539 #[tokio::test]
540 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
541 let temp_dir = tempfile::tempdir().unwrap();
542 let wal = Wal::new(temp_dir.path()).unwrap();
543
544 let provider_factory = create_test_provider_factory();
545 let genesis_hash = init_genesis(&provider_factory)?;
546 let genesis_block = provider_factory
547 .block(genesis_hash.into())?
548 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
549
550 let provider = BlockchainProvider2::new(provider_factory)?;
551
552 let node_head =
553 Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
554 let exex_head =
555 ExExHead { block: BlockNumHash { number: node_head.number, hash: node_head.hash } };
556
557 let notification = ExExNotification::ChainCommitted {
558 new: Arc::new(Chain::new(
559 vec![Block {
560 header: Header {
561 parent_hash: node_head.hash,
562 number: node_head.number + 1,
563 ..Default::default()
564 },
565 ..Default::default()
566 }
567 .seal_slow()
568 .seal_with_senders()
569 .ok_or_eyre("failed to recover senders")?],
570 Default::default(),
571 None,
572 )),
573 };
574
575 let (notifications_tx, notifications_rx) = mpsc::channel(1);
576
577 notifications_tx.send(notification.clone()).await?;
578
579 let mut notifications = ExExNotificationsWithoutHead::new(
580 node_head,
581 provider,
582 EthExecutorProvider::mainnet(),
583 notifications_rx,
584 wal.handle(),
585 )
586 .with_head(exex_head);
587
588 let new_notification = notifications.next().await.transpose()?;
589 assert_eq!(new_notification, Some(notification));
590
591 Ok(())
592 }
593
594 #[tokio::test(flavor = "multi_thread")]
595 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
596 let mut rng = generators::rng();
597
598 let temp_dir = tempfile::tempdir().unwrap();
599 let wal = Wal::new(temp_dir.path()).unwrap();
600
601 let provider_factory = create_test_provider_factory();
602 let genesis_hash = init_genesis(&provider_factory)?;
603 let genesis_block = provider_factory
604 .block(genesis_hash.into())?
605 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
606
607 let provider = BlockchainProvider2::new(provider_factory)?;
608
609 let node_head_block = random_block(
610 &mut rng,
611 genesis_block.number + 1,
612 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
613 )
614 .seal_with_senders::<reth_primitives::Block>()
615 .ok_or_eyre("failed to recover senders")?;
616 let node_head = Head {
617 number: node_head_block.number,
618 hash: node_head_block.hash(),
619 ..Default::default()
620 };
621 let provider_rw = provider.database_provider_rw()?;
622 provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
623 provider_rw.commit()?;
624 let node_head_notification = ExExNotification::ChainCommitted {
625 new: Arc::new(
626 BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
627 .backfill(node_head.number..=node_head.number)
628 .next()
629 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
630 ),
631 };
632
633 let exex_head_block = random_block(
634 &mut rng,
635 genesis_block.number + 1,
636 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
637 );
638 let exex_head = ExExHead { block: exex_head_block.num_hash() };
639 let exex_head_notification = ExExNotification::ChainCommitted {
640 new: Arc::new(Chain::new(
641 vec![exex_head_block
642 .clone()
643 .seal_with_senders()
644 .ok_or_eyre("failed to recover senders")?],
645 Default::default(),
646 None,
647 )),
648 };
649 wal.commit(&exex_head_notification)?;
650
651 let new_notification = ExExNotification::ChainCommitted {
652 new: Arc::new(Chain::new(
653 vec![random_block(
654 &mut rng,
655 node_head.number + 1,
656 BlockParams { parent: Some(node_head.hash), ..Default::default() },
657 )
658 .seal_with_senders()
659 .ok_or_eyre("failed to recover senders")?],
660 Default::default(),
661 None,
662 )),
663 };
664
665 let (notifications_tx, notifications_rx) = mpsc::channel(1);
666
667 notifications_tx.send(new_notification.clone()).await?;
668
669 let mut notifications = ExExNotificationsWithoutHead::new(
670 node_head,
671 provider,
672 EthExecutorProvider::mainnet(),
673 notifications_rx,
674 wal.handle(),
675 )
676 .with_head(exex_head);
677
678 assert_eq!(
681 notifications.next().await.transpose()?,
682 Some(exex_head_notification.into_inverted())
683 );
684 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
687 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
689
690 Ok(())
691 }
692
693 #[tokio::test]
694 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
695 reth_tracing::init_test_tracing();
696 let mut rng = generators::rng();
697
698 let temp_dir = tempfile::tempdir().unwrap();
699 let wal = Wal::new(temp_dir.path()).unwrap();
700
701 let provider_factory = create_test_provider_factory();
702 let genesis_hash = init_genesis(&provider_factory)?;
703 let genesis_block = provider_factory
704 .block(genesis_hash.into())?
705 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
706
707 let provider = BlockchainProvider2::new(provider_factory)?;
708
709 let exex_head_block = random_block(
710 &mut rng,
711 genesis_block.number + 1,
712 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
713 );
714 let exex_head_notification = ExExNotification::ChainCommitted {
715 new: Arc::new(Chain::new(
716 vec![exex_head_block
717 .clone()
718 .seal_with_senders()
719 .ok_or_eyre("failed to recover senders")?],
720 Default::default(),
721 None,
722 )),
723 };
724 wal.commit(&exex_head_notification)?;
725
726 let node_head =
727 Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
728 let exex_head = ExExHead {
729 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
730 };
731
732 let new_notification = ExExNotification::ChainCommitted {
733 new: Arc::new(Chain::new(
734 vec![random_block(
735 &mut rng,
736 genesis_block.number + 1,
737 BlockParams { parent: Some(genesis_hash), ..Default::default() },
738 )
739 .seal_with_senders()
740 .ok_or_eyre("failed to recover senders")?],
741 Default::default(),
742 None,
743 )),
744 };
745
746 let (notifications_tx, notifications_rx) = mpsc::channel(1);
747
748 notifications_tx.send(new_notification.clone()).await?;
749
750 let mut notifications = ExExNotificationsWithoutHead::new(
751 node_head,
752 provider,
753 EthExecutorProvider::mainnet(),
754 notifications_rx,
755 wal.handle(),
756 )
757 .with_head(exex_head);
758
759 assert_eq!(
762 notifications.next().await.transpose()?,
763 Some(exex_head_notification.into_inverted())
764 );
765
766 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
768
769 Ok(())
770 }
771}