1use crate::{
2 wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_chainspec::Head;
11use reth_evm::execute::BlockExecutorProvider;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives::{EthPrimitives, SealedHeader};
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18 collections::VecDeque,
19 fmt::Debug,
20 future::{poll_fn, Future},
21 ops::Not,
22 pin::Pin,
23 sync::{
24 atomic::{AtomicUsize, Ordering},
25 Arc,
26 },
27 task::{ready, Context, Poll},
28};
29use tokio::sync::{
30 mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31 watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41pub const WAL_BLOCKS_WARNING: usize = 128;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum ExExNotificationSource {
53 Pipeline,
55 BlockchainTree,
57}
58
59#[derive(Metrics)]
61#[metrics(scope = "exex")]
62struct ExExMetrics {
63 notifications_sent_total: Counter,
65 events_sent_total: Counter,
67}
68
69#[derive(Debug)]
75pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
76 id: String,
78 metrics: ExExMetrics,
80 sender: PollSender<ExExNotification<N>>,
82 receiver: UnboundedReceiver<ExExEvent>,
84 next_notification_id: usize,
86 finished_height: Option<BlockNumHash>,
90}
91
92impl<N: NodePrimitives> ExExHandle<N> {
93 pub fn new<P, E: BlockExecutorProvider<Primitives = N>>(
98 id: String,
99 node_head: Head,
100 provider: P,
101 executor: E,
102 wal_handle: WalHandle<N>,
103 ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
104 let (notification_tx, notification_rx) = mpsc::channel(1);
105 let (event_tx, event_rx) = mpsc::unbounded_channel();
106 let notifications =
107 ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);
108
109 (
110 Self {
111 id: id.clone(),
112 metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
113 sender: PollSender::new(notification_tx),
114 receiver: event_rx,
115 next_notification_id: 0,
116 finished_height: None,
117 },
118 event_tx,
119 notifications,
120 )
121 }
122
123 fn send(
128 &mut self,
129 cx: &mut Context<'_>,
130 (notification_id, notification): &(usize, ExExNotification<N>),
131 ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
132 if let Some(finished_height) = self.finished_height {
133 match notification {
134 ExExNotification::ChainCommitted { new } => {
135 if finished_height.number >= new.tip().number() {
139 debug!(
140 target: "exex::manager",
141 exex_id = %self.id,
142 %notification_id,
143 ?finished_height,
144 new_tip = %new.tip().number(),
145 "Skipping notification"
146 );
147
148 self.next_notification_id = notification_id + 1;
149 return Poll::Ready(Ok(()))
150 }
151 }
152 ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
157 }
158 }
159
160 debug!(
161 target: "exex::manager",
162 exex_id = %self.id,
163 %notification_id,
164 "Reserving slot for notification"
165 );
166 match self.sender.poll_reserve(cx) {
167 Poll::Ready(Ok(())) => (),
168 other => return other,
169 }
170
171 debug!(
172 target: "exex::manager",
173 exex_id = %self.id,
174 %notification_id,
175 "Sending notification"
176 );
177 match self.sender.send_item(notification.clone()) {
178 Ok(()) => {
179 self.next_notification_id = notification_id + 1;
180 self.metrics.notifications_sent_total.increment(1);
181 Poll::Ready(Ok(()))
182 }
183 Err(err) => Poll::Ready(Err(err)),
184 }
185 }
186}
187
188#[derive(Metrics)]
190#[metrics(scope = "exex.manager")]
191pub struct ExExManagerMetrics {
192 max_capacity: Gauge,
194 current_capacity: Gauge,
196 buffer_size: Gauge,
200 num_exexs: Gauge,
202}
203
204#[derive(Debug)]
214pub struct ExExManager<P, N: NodePrimitives> {
215 provider: P,
217
218 exex_handles: Vec<ExExHandle<N>>,
220
221 handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
223
224 min_id: usize,
226 next_id: usize,
228 buffer: VecDeque<(usize, ExExNotification<N>)>,
233 max_capacity: usize,
235 current_capacity: Arc<AtomicUsize>,
239
240 is_ready: watch::Sender<bool>,
242
243 finished_height: watch::Sender<FinishedExExHeight>,
245
246 wal: Wal<N>,
248 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
250
251 handle: ExExManagerHandle<N>,
253 metrics: ExExManagerMetrics,
255}
256
257impl<P, N> ExExManager<P, N>
258where
259 N: NodePrimitives,
260{
261 pub fn new(
269 provider: P,
270 handles: Vec<ExExHandle<N>>,
271 max_capacity: usize,
272 wal: Wal<N>,
273 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
274 ) -> Self {
275 let num_exexs = handles.len();
276
277 let (handle_tx, handle_rx) = mpsc::unbounded_channel();
278 let (is_ready_tx, is_ready_rx) = watch::channel(true);
279 let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
280 FinishedExExHeight::NoExExs
281 } else {
282 FinishedExExHeight::NotReady
283 });
284
285 let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
286
287 let metrics = ExExManagerMetrics::default();
288 metrics.max_capacity.set(max_capacity as f64);
289 metrics.num_exexs.set(num_exexs as f64);
290
291 Self {
292 provider,
293
294 exex_handles: handles,
295
296 handle_rx,
297
298 min_id: 0,
299 next_id: 0,
300 buffer: VecDeque::with_capacity(max_capacity),
301 max_capacity,
302 current_capacity: Arc::clone(¤t_capacity),
303
304 is_ready: is_ready_tx,
305 finished_height: finished_height_tx,
306
307 wal,
308 finalized_header_stream,
309
310 handle: ExExManagerHandle {
311 exex_tx: handle_tx,
312 num_exexs,
313 is_ready_receiver: is_ready_rx.clone(),
314 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
315 current_capacity,
316 finished_height: finished_height_rx,
317 },
318 metrics,
319 }
320 }
321
322 pub fn handle(&self) -> ExExManagerHandle<N> {
324 self.handle.clone()
325 }
326
327 fn update_capacity(&self) {
330 let capacity = self.max_capacity.saturating_sub(self.buffer.len());
331 self.current_capacity.store(capacity, Ordering::Relaxed);
332 self.metrics.current_capacity.set(capacity as f64);
333 self.metrics.buffer_size.set(self.buffer.len() as f64);
334
335 let _ = self.is_ready.send(capacity > 0);
338 }
339
340 fn push_notification(&mut self, notification: ExExNotification<N>) {
343 let next_id = self.next_id;
344 self.buffer.push_back((next_id, notification));
345 self.next_id += 1;
346 }
347}
348
349impl<P, N> ExExManager<P, N>
350where
351 P: HeaderProvider,
352 N: NodePrimitives,
353{
354 fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
359 debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
360
361 let exex_finished_heights = self
363 .exex_handles
364 .iter()
365 .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
367 .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
369 .map(|(exex_id, num_hash)| {
371 num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
372 self.provider
373 .is_known(&num_hash.hash)
374 .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
376 })
377 })
378 .collect::<Result<Vec<_>, _>>()?;
380 if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
381 let lowest_finished_height = exex_finished_heights
385 .iter()
386 .copied()
387 .filter_map(|(_, num_hash, _)| num_hash)
388 .chain([(finalized_header.num_hash())])
389 .min_by_key(|num_hash| num_hash.number)
390 .unwrap();
391
392 self.wal.finalize(lowest_finished_height)?;
393 if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
394 warn!(
395 target: "exex::manager",
396 blocks = ?self.wal.num_blocks(),
397 "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
398 );
399 }
400 } else {
401 let unfinalized_exexes = exex_finished_heights
402 .into_iter()
403 .filter_map(|(exex_id, num_hash, is_canonical)| {
404 is_canonical.not().then_some((exex_id, num_hash))
405 })
406 .format_with(", ", |(exex_id, num_hash), f| {
407 f(&format_args!("{exex_id} = {num_hash:?}"))
408 })
409 .to_string();
412 debug!(
413 target: "exex::manager",
414 %unfinalized_exexes,
415 "Not all ExExes are on the canonical chain, can't finalize the WAL"
416 );
417 }
418
419 Ok(())
420 }
421}
422
423impl<P, N> Future for ExExManager<P, N>
424where
425 P: HeaderProvider + Unpin + 'static,
426 N: NodePrimitives,
427{
428 type Output = eyre::Result<()>;
429
430 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442 let this = self.get_mut();
443
444 for exex in &mut this.exex_handles {
446 while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
447 debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
448 exex.metrics.events_sent_total.increment(1);
449 match event {
450 ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
451 }
452 }
453 }
454
455 let mut last_finalized_header = None;
457 while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
458 last_finalized_header = finalized_header;
459 }
460 if let Some(header) = last_finalized_header {
461 this.finalize_wal(header)?;
462 }
463
464 while this.buffer.len() < this.max_capacity {
466 if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
467 let committed_tip =
468 notification.committed_chain().map(|chain| chain.tip().number());
469 let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
470 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
471
472 match source {
475 ExExNotificationSource::BlockchainTree => {
476 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
477 this.wal.commit(¬ification)?;
478 }
479 ExExNotificationSource::Pipeline => {
480 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
481 }
482 }
483
484 this.push_notification(notification);
485 continue
486 }
487 break
488 }
489
490 this.update_capacity();
492
493 let mut min_id = usize::MAX;
495 for idx in (0..this.exex_handles.len()).rev() {
496 let mut exex = this.exex_handles.swap_remove(idx);
497
498 let notification_index = exex
501 .next_notification_id
502 .checked_sub(this.min_id)
503 .expect("exex expected notification ID outside the manager's range");
504 if let Some(notification) = this.buffer.get(notification_index) {
505 if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
506 return Poll::Ready(Err(err.into()))
508 }
509 }
510 min_id = min_id.min(exex.next_notification_id);
511 this.exex_handles.push(exex);
512 }
513
514 debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
516 this.buffer.retain(|&(id, _)| id >= min_id);
517 this.min_id = min_id;
518
519 this.update_capacity();
521
522 let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
524 exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
525 });
526 if let Ok(finished_height) = finished_height {
527 let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
528 }
529
530 Poll::Pending
531 }
532}
533
534#[derive(Debug)]
536pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
537 exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
539 num_exexs: usize,
541 is_ready_receiver: watch::Receiver<bool>,
547 is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
550 current_capacity: Arc<AtomicUsize>,
552 finished_height: watch::Receiver<FinishedExExHeight>,
554}
555
556impl<N: NodePrimitives> ExExManagerHandle<N> {
557 pub fn empty() -> Self {
563 let (exex_tx, _) = mpsc::unbounded_channel();
564 let (_, is_ready_rx) = watch::channel(true);
565 let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
566
567 Self {
568 exex_tx,
569 num_exexs: 0,
570 is_ready_receiver: is_ready_rx.clone(),
571 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
572 current_capacity: Arc::new(AtomicUsize::new(0)),
573 finished_height: finished_height_rx,
574 }
575 }
576
577 pub fn send(
581 &self,
582 source: ExExNotificationSource,
583 notification: ExExNotification<N>,
584 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
585 self.exex_tx.send((source, notification))
586 }
587
588 pub async fn send_async(
593 &mut self,
594 source: ExExNotificationSource,
595 notification: ExExNotification<N>,
596 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
597 self.ready().await;
598 self.exex_tx.send((source, notification))
599 }
600
601 pub fn capacity(&self) -> usize {
603 self.current_capacity.load(Ordering::Relaxed)
604 }
605
606 pub fn has_capacity(&self) -> bool {
611 self.capacity() > 0
612 }
613
614 pub const fn has_exexs(&self) -> bool {
616 self.num_exexs > 0
617 }
618
619 pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
621 self.finished_height.clone()
622 }
623
624 pub async fn ready(&mut self) {
626 poll_fn(|cx| self.poll_ready(cx)).await
627 }
628
629 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
631 let rx = ready!(self.is_ready.poll(cx));
632 self.is_ready.set(make_wait_future(rx));
633 Poll::Ready(())
634 }
635}
636
637async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
639 let _ = rx.wait_for(|ready| *ready).await;
642 rx
643}
644
645impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
646 fn clone(&self) -> Self {
647 Self {
648 exex_tx: self.exex_tx.clone(),
649 num_exexs: self.num_exexs,
650 is_ready_receiver: self.is_ready_receiver.clone(),
651 is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
652 current_capacity: self.current_capacity.clone(),
653 finished_height: self.finished_height.clone(),
654 }
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use alloy_primitives::B256;
662 use futures::{StreamExt, TryStreamExt};
663 use rand::Rng;
664 use reth_db_common::init::init_genesis;
665 use reth_evm::test_utils::MockExecutorProvider;
666 use reth_evm_ethereum::execute::EthExecutorProvider;
667 use reth_primitives::SealedBlockWithSenders;
668 use reth_provider::{
669 providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
670 BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant,
671 };
672 use reth_testing_utils::generators::{self, random_block, BlockParams};
673
674 fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
675 let (tx, rx) = watch::channel(None);
676 std::mem::forget(tx);
678 ForkChoiceStream::new(rx)
679 }
680
681 #[tokio::test]
682 async fn test_delivers_events() {
683 let temp_dir = tempfile::tempdir().unwrap();
684 let wal = Wal::new(temp_dir.path()).unwrap();
685
686 let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
687 "test_exex".to_string(),
688 Head::default(),
689 (),
690 MockExecutorProvider::default(),
691 wal.handle(),
692 );
693
694 let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
696 event_tx.send(event).unwrap();
697 let received_event = exex_handle.receiver.recv().await.unwrap();
698 assert_eq!(received_event, event);
699 }
700
701 #[tokio::test]
702 async fn test_has_exexs() {
703 let temp_dir = tempfile::tempdir().unwrap();
704 let wal = Wal::new(temp_dir.path()).unwrap();
705
706 let (exex_handle_1, _, _) = ExExHandle::new(
707 "test_exex_1".to_string(),
708 Head::default(),
709 (),
710 MockExecutorProvider::default(),
711 wal.handle(),
712 );
713
714 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
715 .handle
716 .has_exexs());
717
718 assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
719 .handle
720 .has_exexs());
721 }
722
723 #[tokio::test]
724 async fn test_has_capacity() {
725 let temp_dir = tempfile::tempdir().unwrap();
726 let wal = Wal::new(temp_dir.path()).unwrap();
727
728 let (exex_handle_1, _, _) = ExExHandle::new(
729 "test_exex_1".to_string(),
730 Head::default(),
731 (),
732 MockExecutorProvider::default(),
733 wal.handle(),
734 );
735
736 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
737 .handle
738 .has_capacity());
739
740 assert!(ExExManager::new(
741 (),
742 vec![exex_handle_1],
743 10,
744 wal,
745 empty_finalized_header_stream()
746 )
747 .handle
748 .has_capacity());
749 }
750
751 #[test]
752 fn test_push_notification() {
753 let temp_dir = tempfile::tempdir().unwrap();
754 let wal = Wal::new(temp_dir.path()).unwrap();
755
756 let (exex_handle, _, _) = ExExHandle::new(
757 "test_exex".to_string(),
758 Head::default(),
759 (),
760 MockExecutorProvider::default(),
761 wal.handle(),
762 );
763
764 let mut exex_manager =
766 ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
767
768 let mut block1: SealedBlockWithSenders = Default::default();
770 block1.block.header.set_hash(B256::new([0x01; 32]));
771 block1.block.header.set_block_number(10);
772
773 let notification1 = ExExNotification::ChainCommitted {
774 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
775 };
776
777 exex_manager.push_notification(notification1.clone());
779
780 assert_eq!(exex_manager.buffer.len(), 1);
782 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
783 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
784 assert_eq!(exex_manager.next_id, 1);
785
786 let mut block2: SealedBlockWithSenders = Default::default();
788 block2.block.header.set_hash(B256::new([0x02; 32]));
789 block2.block.header.set_block_number(20);
790
791 let notification2 = ExExNotification::ChainCommitted {
792 new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
793 };
794
795 exex_manager.push_notification(notification2.clone());
796
797 assert_eq!(exex_manager.buffer.len(), 2);
799 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
800 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
801 assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
802 assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
803 assert_eq!(exex_manager.next_id, 2);
804 }
805
806 #[test]
807 fn test_update_capacity() {
808 let temp_dir = tempfile::tempdir().unwrap();
809 let wal = Wal::new(temp_dir.path()).unwrap();
810
811 let (exex_handle, _, _) = ExExHandle::new(
812 "test_exex".to_string(),
813 Head::default(),
814 (),
815 MockExecutorProvider::default(),
816 wal.handle(),
817 );
818
819 let max_capacity = 5;
821 let mut exex_manager = ExExManager::new(
822 (),
823 vec![exex_handle],
824 max_capacity,
825 wal,
826 empty_finalized_header_stream(),
827 );
828
829 let mut block1: SealedBlockWithSenders = Default::default();
831 block1.block.header.set_hash(B256::new([0x01; 32]));
832 block1.block.header.set_block_number(10);
833
834 let notification1 = ExExNotification::ChainCommitted {
835 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
836 };
837
838 exex_manager.push_notification(notification1.clone());
839 exex_manager.push_notification(notification1);
840
841 exex_manager.update_capacity();
843
844 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
846
847 exex_manager.buffer.clear();
849 exex_manager.update_capacity();
850
851 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
853 }
854
855 #[tokio::test]
856 async fn test_updates_block_height() {
857 let temp_dir = tempfile::tempdir().unwrap();
858 let wal = Wal::new(temp_dir.path()).unwrap();
859
860 let provider_factory = create_test_provider_factory();
861
862 let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
863 "test_exex".to_string(),
864 Head::default(),
865 (),
866 MockExecutorProvider::default(),
867 wal.handle(),
868 );
869
870 assert!(exex_handle.finished_height.is_none());
872
873 let block = BlockNumHash::new(42, B256::random());
875 event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
876
877 let exex_manager = ExExManager::new(
879 provider_factory,
880 vec![exex_handle],
881 10,
882 Wal::new(temp_dir.path()).unwrap(),
883 empty_finalized_header_stream(),
884 );
885
886 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
887
888 let mut pinned_manager = std::pin::pin!(exex_manager);
890 let _ = pinned_manager.as_mut().poll(&mut cx);
891
892 let updated_exex_handle = &pinned_manager.exex_handles[0];
894 assert_eq!(updated_exex_handle.finished_height, Some(block));
895
896 let mut receiver = pinned_manager.handle.finished_height();
898
899 receiver.changed().await.unwrap();
901
902 let finished_height = *receiver.borrow();
904
905 assert_eq!(finished_height, FinishedExExHeight::Height(42));
907 }
908
909 #[tokio::test]
910 async fn test_updates_block_height_lower() {
911 let temp_dir = tempfile::tempdir().unwrap();
912 let wal = Wal::new(temp_dir.path()).unwrap();
913
914 let provider_factory = create_test_provider_factory();
915
916 let (exex_handle1, event_tx1, _) = ExExHandle::new(
918 "test_exex1".to_string(),
919 Head::default(),
920 (),
921 MockExecutorProvider::default(),
922 wal.handle(),
923 );
924 let (exex_handle2, event_tx2, _) = ExExHandle::new(
925 "test_exex2".to_string(),
926 Head::default(),
927 (),
928 MockExecutorProvider::default(),
929 wal.handle(),
930 );
931
932 let block1 = BlockNumHash::new(42, B256::random());
933 let block2 = BlockNumHash::new(10, B256::random());
934
935 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
937 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
938
939 let exex_manager = ExExManager::new(
940 provider_factory,
941 vec![exex_handle1, exex_handle2],
942 10,
943 Wal::new(temp_dir.path()).unwrap(),
944 empty_finalized_header_stream(),
945 );
946
947 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
948
949 let mut pinned_manager = std::pin::pin!(exex_manager);
950
951 let _ = pinned_manager.as_mut().poll(&mut cx);
952
953 let mut receiver = pinned_manager.handle.finished_height();
955
956 receiver.changed().await.unwrap();
958
959 let finished_height = *receiver.borrow();
961
962 assert_eq!(finished_height, FinishedExExHeight::Height(10));
964 }
965
966 #[tokio::test]
967 async fn test_updates_block_height_greater() {
968 let temp_dir = tempfile::tempdir().unwrap();
969 let wal = Wal::new(temp_dir.path()).unwrap();
970
971 let provider_factory = create_test_provider_factory();
972
973 let (exex_handle1, event_tx1, _) = ExExHandle::new(
975 "test_exex1".to_string(),
976 Head::default(),
977 (),
978 MockExecutorProvider::default(),
979 wal.handle(),
980 );
981 let (exex_handle2, event_tx2, _) = ExExHandle::new(
982 "test_exex2".to_string(),
983 Head::default(),
984 (),
985 MockExecutorProvider::default(),
986 wal.handle(),
987 );
988
989 assert!(exex_handle1.finished_height.is_none());
991
992 let block1 = BlockNumHash::new(42, B256::random());
993 let block2 = BlockNumHash::new(100, B256::random());
994
995 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
997 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
998
999 let exex_manager = ExExManager::new(
1000 provider_factory,
1001 vec![exex_handle1, exex_handle2],
1002 10,
1003 Wal::new(temp_dir.path()).unwrap(),
1004 empty_finalized_header_stream(),
1005 );
1006
1007 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1008
1009 let mut pinned_manager = std::pin::pin!(exex_manager);
1010
1011 let _ = pinned_manager.as_mut().poll(&mut cx);
1012
1013 let mut receiver = pinned_manager.handle.finished_height();
1015
1016 receiver.changed().await.unwrap();
1018
1019 let finished_height = *receiver.borrow();
1021
1022 assert_eq!(finished_height, FinishedExExHeight::Height(42));
1024
1025 }
1029
1030 #[tokio::test]
1031 async fn test_exex_manager_capacity() {
1032 let temp_dir = tempfile::tempdir().unwrap();
1033 let wal = Wal::new(temp_dir.path()).unwrap();
1034
1035 let provider_factory = create_test_provider_factory();
1036
1037 let (exex_handle_1, _, _) = ExExHandle::new(
1038 "test_exex_1".to_string(),
1039 Head::default(),
1040 (),
1041 MockExecutorProvider::default(),
1042 wal.handle(),
1043 );
1044
1045 let max_capacity = 2;
1047 let mut exex_manager = ExExManager::new(
1048 provider_factory,
1049 vec![exex_handle_1],
1050 max_capacity,
1051 Wal::new(temp_dir.path()).unwrap(),
1052 empty_finalized_header_stream(),
1053 );
1054
1055 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1056
1057 let notification = ExExNotification::ChainCommitted {
1059 new: Arc::new(Chain::new(
1060 vec![Default::default()],
1061 Default::default(),
1062 Default::default(),
1063 )),
1064 };
1065
1066 exex_manager
1068 .handle
1069 .exex_tx
1070 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1071 .unwrap();
1072 exex_manager
1073 .handle
1074 .exex_tx
1075 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1076 .unwrap();
1077 exex_manager
1078 .handle
1079 .exex_tx
1080 .send((ExExNotificationSource::BlockchainTree, notification))
1081 .unwrap();
1082
1083 let mut pinned_manager = std::pin::pin!(exex_manager);
1085
1086 assert_eq!(pinned_manager.next_id, 0);
1088 assert_eq!(pinned_manager.buffer.len(), 0);
1089
1090 let _ = pinned_manager.as_mut().poll(&mut cx);
1091
1092 assert_eq!(pinned_manager.next_id, 2);
1094 assert_eq!(pinned_manager.buffer.len(), 2);
1095 }
1096
1097 #[tokio::test]
1098 async fn exex_handle_new() {
1099 let provider_factory = create_test_provider_factory();
1100 init_genesis(&provider_factory).unwrap();
1101 let provider = BlockchainProvider2::new(provider_factory).unwrap();
1102
1103 let temp_dir = tempfile::tempdir().unwrap();
1104 let wal = Wal::new(temp_dir.path()).unwrap();
1105
1106 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1107 "test_exex".to_string(),
1108 Head::default(),
1109 provider,
1110 EthExecutorProvider::mainnet(),
1111 wal.handle(),
1112 );
1113
1114 assert_eq!(exex_handle.id, "test_exex");
1116 assert_eq!(exex_handle.next_notification_id, 0);
1117
1118 let mut block1: SealedBlockWithSenders = Default::default();
1120 block1.block.header.set_hash(B256::new([0x01; 32]));
1121 block1.block.header.set_block_number(10);
1122
1123 let mut block2: SealedBlockWithSenders = Default::default();
1124 block2.block.header.set_hash(B256::new([0x02; 32]));
1125 block2.block.header.set_block_number(11);
1126
1127 let notification = ExExNotification::ChainCommitted {
1129 new: Arc::new(Chain::new(
1130 vec![block1.clone(), block2.clone()],
1131 Default::default(),
1132 Default::default(),
1133 )),
1134 };
1135
1136 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1137
1138 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1140 Poll::Ready(Ok(())) => {
1141 let received_notification = notifications.next().await.unwrap().unwrap();
1142 assert_eq!(received_notification, notification);
1143 }
1144 Poll::Pending => panic!("Notification send is pending"),
1145 Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e),
1146 }
1147
1148 assert_eq!(exex_handle.next_notification_id, 23);
1150 }
1151
1152 #[tokio::test]
1153 async fn test_notification_if_finished_height_gt_chain_tip() {
1154 let provider_factory = create_test_provider_factory();
1155 init_genesis(&provider_factory).unwrap();
1156 let provider = BlockchainProvider2::new(provider_factory).unwrap();
1157
1158 let temp_dir = tempfile::tempdir().unwrap();
1159 let wal = Wal::new(temp_dir.path()).unwrap();
1160
1161 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1162 "test_exex".to_string(),
1163 Head::default(),
1164 provider,
1165 EthExecutorProvider::mainnet(),
1166 wal.handle(),
1167 );
1168
1169 exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1171
1172 let mut block1: SealedBlockWithSenders = Default::default();
1173 block1.block.header.set_hash(B256::new([0x01; 32]));
1174 block1.block.header.set_block_number(10);
1175
1176 let notification = ExExNotification::ChainCommitted {
1177 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1178 };
1179
1180 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1181
1182 match exex_handle.send(&mut cx, &(22, notification)) {
1184 Poll::Ready(Ok(())) => {
1185 poll_fn(|cx| {
1186 assert!(notifications.poll_next_unpin(cx).is_pending());
1189 Poll::Ready(())
1190 })
1191 .await;
1192 }
1193 Poll::Pending | Poll::Ready(Err(_)) => {
1194 panic!("Notification should not be pending or fail");
1195 }
1196 }
1197
1198 assert_eq!(exex_handle.next_notification_id, 23);
1200 }
1201
1202 #[tokio::test]
1203 async fn test_sends_chain_reorged_notification() {
1204 let provider_factory = create_test_provider_factory();
1205 init_genesis(&provider_factory).unwrap();
1206 let provider = BlockchainProvider2::new(provider_factory).unwrap();
1207
1208 let temp_dir = tempfile::tempdir().unwrap();
1209 let wal = Wal::new(temp_dir.path()).unwrap();
1210
1211 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1212 "test_exex".to_string(),
1213 Head::default(),
1214 provider,
1215 EthExecutorProvider::mainnet(),
1216 wal.handle(),
1217 );
1218
1219 let notification = ExExNotification::ChainReorged {
1220 old: Arc::new(Chain::default()),
1221 new: Arc::new(Chain::default()),
1222 };
1223
1224 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1227
1228 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1229
1230 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1232 Poll::Ready(Ok(())) => {
1233 let received_notification = notifications.next().await.unwrap().unwrap();
1234 assert_eq!(received_notification, notification);
1235 }
1236 Poll::Pending | Poll::Ready(Err(_)) => {
1237 panic!("Notification should not be pending or fail")
1238 }
1239 }
1240
1241 assert_eq!(exex_handle.next_notification_id, 23);
1243 }
1244
1245 #[tokio::test]
1246 async fn test_sends_chain_reverted_notification() {
1247 let provider_factory = create_test_provider_factory();
1248 init_genesis(&provider_factory).unwrap();
1249 let provider = BlockchainProvider2::new(provider_factory).unwrap();
1250
1251 let temp_dir = tempfile::tempdir().unwrap();
1252 let wal = Wal::new(temp_dir.path()).unwrap();
1253
1254 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1255 "test_exex".to_string(),
1256 Head::default(),
1257 provider,
1258 EthExecutorProvider::mainnet(),
1259 wal.handle(),
1260 );
1261
1262 let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1263
1264 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1267
1268 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1269
1270 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1272 Poll::Ready(Ok(())) => {
1273 let received_notification = notifications.next().await.unwrap().unwrap();
1274 assert_eq!(received_notification, notification);
1275 }
1276 Poll::Pending | Poll::Ready(Err(_)) => {
1277 panic!("Notification should not be pending or fail")
1278 }
1279 }
1280
1281 assert_eq!(exex_handle.next_notification_id, 23);
1283 }
1284
1285 #[tokio::test]
1286 async fn test_exex_wal() -> eyre::Result<()> {
1287 reth_tracing::init_test_tracing();
1288
1289 let mut rng = generators::rng();
1290
1291 let provider_factory = create_test_provider_factory();
1292 let genesis_hash = init_genesis(&provider_factory).unwrap();
1293 let genesis_block = provider_factory
1294 .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1295 .unwrap()
1296 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1297
1298 let block = random_block(
1299 &mut rng,
1300 genesis_block.number + 1,
1301 BlockParams { parent: Some(genesis_hash), ..Default::default() },
1302 )
1303 .seal_with_senders::<reth_primitives::Block>()
1304 .unwrap();
1305 let provider_rw = provider_factory.database_provider_rw().unwrap();
1306 provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap();
1307 provider_rw.commit().unwrap();
1308
1309 let provider = BlockchainProvider2::new(provider_factory).unwrap();
1310
1311 let temp_dir = tempfile::tempdir().unwrap();
1312 let wal = Wal::new(temp_dir.path()).unwrap();
1313
1314 let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1315 "test_exex".to_string(),
1316 Head::default(),
1317 provider.clone(),
1318 EthExecutorProvider::mainnet(),
1319 wal.handle(),
1320 );
1321
1322 let genesis_notification = ExExNotification::ChainCommitted {
1323 new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
1324 };
1325 let notification = ExExNotification::ChainCommitted {
1326 new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
1327 };
1328
1329 let (finalized_headers_tx, rx) = watch::channel(None);
1330 finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
1331 let finalized_header_stream = ForkChoiceStream::new(rx);
1332
1333 let mut exex_manager = std::pin::pin!(ExExManager::new(
1334 provider,
1335 vec![exex_handle],
1336 2,
1337 wal,
1338 finalized_header_stream
1339 ));
1340
1341 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1342
1343 exex_manager
1344 .handle()
1345 .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1346 exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1347
1348 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1349 assert_eq!(
1350 notifications.try_poll_next_unpin(&mut cx)?,
1351 Poll::Ready(Some(genesis_notification))
1352 );
1353 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1354 assert_eq!(
1355 notifications.try_poll_next_unpin(&mut cx)?,
1356 Poll::Ready(Some(notification.clone()))
1357 );
1358 assert_eq!(
1360 exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
1361 [notification.clone()]
1362 );
1363
1364 finalized_headers_tx.send(Some(block.header.clone()))?;
1365 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1366 assert_eq!(
1368 exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
1369 [notification.clone()]
1370 );
1371
1372 events_tx
1374 .send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
1375 .unwrap();
1376
1377 finalized_headers_tx.send(Some(block.header.clone()))?;
1378 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1379 assert_eq!(
1382 exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
1383 [notification]
1384 );
1385
1386 events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1388
1389 finalized_headers_tx.send(Some(block.header.clone()))?;
1390 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1391 assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1393
1394 Ok(())
1395 }
1396}