reth_exex/wal/
mod.rs

1#![allow(dead_code)]
2
3mod cache;
4pub use cache::BlockCache;
5mod storage;
6use reth_node_api::NodePrimitives;
7use reth_primitives::EthPrimitives;
8pub use storage::Storage;
9mod metrics;
10use metrics::Metrics;
11
12use std::{
13    path::Path,
14    sync::{
15        atomic::{AtomicU32, Ordering},
16        Arc,
17    },
18};
19
20use alloy_eips::BlockNumHash;
21use alloy_primitives::B256;
22use parking_lot::{RwLock, RwLockReadGuard};
23use reth_exex_types::ExExNotification;
24use reth_tracing::tracing::{debug, instrument};
25
26/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes.
27///
28/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache
29/// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory
30/// and decoding notifications every time we want to iterate or finalize the WAL.
31///
32/// The expected mode of operation is as follows:
33/// 1. On every new canonical chain notification, call [`Wal::commit`].
34/// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
35///    WAL.
36#[derive(Debug, Clone)]
37pub struct Wal<N: NodePrimitives = EthPrimitives> {
38    inner: Arc<WalInner<N>>,
39}
40
41impl<N> Wal<N>
42where
43    N: NodePrimitives,
44{
45    /// Creates a new instance of [`Wal`].
46    pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
47        Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
48    }
49
50    /// Returns a read-only handle to the WAL.
51    pub fn handle(&self) -> WalHandle<N> {
52        WalHandle { wal: self.inner.clone() }
53    }
54
55    /// Commits the notification to WAL.
56    pub fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> {
57        self.inner.commit(notification)
58    }
59
60    /// Finalizes the WAL up to the given canonical block, inclusive.
61    ///
62    /// The caller should check that all ExExes are on the canonical chain and will not need any
63    /// blocks from the WAL below the provided block, inclusive.
64    pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
65        self.inner.finalize(to_block)
66    }
67
68    /// Returns an iterator over all notifications in the WAL.
69    pub fn iter_notifications(
70        &self,
71    ) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> {
72        self.inner.iter_notifications()
73    }
74
75    /// Returns the number of blocks in the WAL.
76    pub fn num_blocks(&self) -> usize {
77        self.inner.block_cache().num_blocks()
78    }
79}
80
81/// Inner type for the WAL.
82#[derive(Debug)]
83struct WalInner<N: NodePrimitives> {
84    next_file_id: AtomicU32,
85    /// The underlying WAL storage backed by a file.
86    storage: Storage<N>,
87    /// WAL block cache. See [`cache::BlockCache`] docs for more details.
88    block_cache: RwLock<BlockCache>,
89    metrics: Metrics,
90}
91
92impl<N> WalInner<N>
93where
94    N: NodePrimitives,
95{
96    fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
97        let mut wal = Self {
98            next_file_id: AtomicU32::new(0),
99            storage: Storage::new(directory)?,
100            block_cache: RwLock::new(BlockCache::default()),
101            metrics: Metrics::default(),
102        };
103        wal.fill_block_cache()?;
104        Ok(wal)
105    }
106
107    fn block_cache(&self) -> RwLockReadGuard<'_, BlockCache> {
108        self.block_cache.read()
109    }
110
111    /// Fills the block cache with the notifications from the storage.
112    #[instrument(skip(self))]
113    fn fill_block_cache(&mut self) -> eyre::Result<()> {
114        let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
115        self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
116
117        let mut block_cache = self.block_cache.write();
118        let mut notifications_size = 0;
119
120        for entry in self.storage.iter_notifications(files_range) {
121            let (file_id, size, notification) = entry?;
122
123            notifications_size += size;
124
125            let committed_chain = notification.committed_chain();
126            let reverted_chain = notification.reverted_chain();
127
128            debug!(
129                target: "exex::wal",
130                ?file_id,
131                reverted_block_range = ?reverted_chain.as_ref().map(|chain| chain.range()),
132                committed_block_range = ?committed_chain.as_ref().map(|chain| chain.range()),
133                "Inserting block cache entries"
134            );
135
136            block_cache.insert_notification_blocks_with_file_id(file_id, &notification);
137        }
138
139        self.update_metrics(&block_cache, notifications_size as i64);
140
141        Ok(())
142    }
143
144    #[instrument(skip_all, fields(
145        reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
146        committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
147    ))]
148    fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> {
149        let mut block_cache = self.block_cache.write();
150
151        let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
152        let size = self.storage.write_notification(file_id, notification)?;
153
154        debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache");
155        block_cache.insert_notification_blocks_with_file_id(file_id, notification);
156
157        self.update_metrics(&block_cache, size as i64);
158
159        Ok(())
160    }
161
162    #[instrument(skip(self))]
163    fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
164        let mut block_cache = self.block_cache.write();
165        let file_ids = block_cache.remove_before(to_block.number);
166
167        // Remove notifications from the storage.
168        if file_ids.is_empty() {
169            debug!(target: "exex::wal", "No notifications were finalized from the storage");
170            return Ok(())
171        }
172
173        let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
174        debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized");
175
176        self.update_metrics(&block_cache, -(removed_size as i64));
177
178        Ok(())
179    }
180
181    fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
182        self.metrics.size_bytes.increment(size_delta as f64);
183        self.metrics.notifications_count.set(block_cache.notification_max_blocks.len() as f64);
184        self.metrics.committed_blocks_count.set(block_cache.committed_blocks.len() as f64);
185
186        if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
187            self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
188        }
189
190        if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
191            self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
192        }
193    }
194
195    /// Returns an iterator over all notifications in the WAL.
196    fn iter_notifications(
197        &self,
198    ) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> {
199        let Some(range) = self.storage.files_range()? else {
200            return Ok(Box::new(std::iter::empty()))
201        };
202
203        Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
204    }
205}
206
207/// A read-only handle to the WAL that can be shared.
208#[derive(Debug)]
209pub struct WalHandle<N: NodePrimitives> {
210    wal: Arc<WalInner<N>>,
211}
212
213impl<N> WalHandle<N>
214where
215    N: NodePrimitives,
216{
217    /// Returns the notification for the given committed block hash if it exists.
218    pub fn get_committed_notification_by_block_hash(
219        &self,
220        block_hash: &B256,
221    ) -> eyre::Result<Option<ExExNotification<N>>> {
222        let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
223        else {
224            return Ok(None)
225        };
226
227        self.wal
228            .storage
229            .read_notification(file_id)
230            .map(|entry| entry.map(|(notification, _)| notification))
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use std::sync::Arc;
237
238    use alloy_primitives::B256;
239    use eyre::OptionExt;
240    use itertools::Itertools;
241    use reth_exex_types::ExExNotification;
242    use reth_provider::Chain;
243    use reth_testing_utils::generators::{
244        self, random_block, random_block_range, BlockParams, BlockRangeParams,
245    };
246
247    use crate::wal::{cache::CachedBlock, Wal};
248
249    fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
250        wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
251            wal.inner
252                .storage
253                .iter_notifications(range)
254                .map(|entry| entry.map(|(_, _, n)| n))
255                .collect()
256        })
257    }
258
259    fn sort_committed_blocks(
260        committed_blocks: Vec<(B256, u32, CachedBlock)>,
261    ) -> Vec<(B256, u32, CachedBlock)> {
262        committed_blocks
263            .into_iter()
264            .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
265            .collect()
266    }
267
268    #[test]
269    fn test_wal() -> eyre::Result<()> {
270        reth_tracing::init_test_tracing();
271
272        let mut rng = generators::rng();
273
274        // Create an instance of the WAL in a temporary directory
275        let temp_dir = tempfile::tempdir()?;
276        let wal = Wal::new(&temp_dir)?;
277        assert!(wal.inner.block_cache().is_empty());
278
279        // Create 4 canonical blocks and one reorged block with number 2
280        let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
281            .into_iter()
282            .map(|block| {
283                block
284                    .seal_with_senders::<reth_primitives::Block>()
285                    .ok_or_eyre("failed to recover senders")
286            })
287            .collect::<eyre::Result<Vec<_>>>()?;
288        let block_1_reorged = random_block(
289            &mut rng,
290            1,
291            BlockParams { parent: Some(blocks[0].hash()), ..Default::default() },
292        )
293        .seal_with_senders::<reth_primitives::Block>()
294        .ok_or_eyre("failed to recover senders")?;
295        let block_2_reorged = random_block(
296            &mut rng,
297            2,
298            BlockParams { parent: Some(blocks[1].hash()), ..Default::default() },
299        )
300        .seal_with_senders::<reth_primitives::Block>()
301        .ok_or_eyre("failed to recover senders")?;
302
303        // Create notifications for the above blocks.
304        // 1. Committed notification for blocks with number 0 and 1
305        // 2. Reverted notification for block with number 1
306        // 3. Committed notification for block with number 1 and 2
307        // 4. Reorged notification for block with number 2 that was reverted, and blocks with number
308        //    2 and 3 that were committed
309        let committed_notification_1 = ExExNotification::ChainCommitted {
310            new: Arc::new(Chain::new(
311                vec![blocks[0].clone(), blocks[1].clone()],
312                Default::default(),
313                None,
314            )),
315        };
316        let reverted_notification = ExExNotification::ChainReverted {
317            old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
318        };
319        let committed_notification_2 = ExExNotification::ChainCommitted {
320            new: Arc::new(Chain::new(
321                vec![block_1_reorged.clone(), blocks[2].clone()],
322                Default::default(),
323                None,
324            )),
325        };
326        let reorged_notification = ExExNotification::ChainReorged {
327            old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
328            new: Arc::new(Chain::new(
329                vec![block_2_reorged.clone(), blocks[3].clone()],
330                Default::default(),
331                None,
332            )),
333        };
334
335        // Commit notifications, verify that the block cache is updated and the notifications are
336        // written to WAL.
337
338        // First notification (commit block 0, 1)
339        let file_id = 0;
340        let committed_notification_1_cache_blocks = (blocks[1].number, file_id);
341        let committed_notification_1_cache_committed_blocks = vec![
342            (
343                blocks[0].hash(),
344                file_id,
345                CachedBlock {
346                    block: (blocks[0].number, blocks[0].hash()).into(),
347                    parent_hash: blocks[0].parent_hash,
348                },
349            ),
350            (
351                blocks[1].hash(),
352                file_id,
353                CachedBlock {
354                    block: (blocks[1].number, blocks[1].hash()).into(),
355                    parent_hash: blocks[1].parent_hash,
356                },
357            ),
358        ];
359        wal.commit(&committed_notification_1)?;
360        assert_eq!(
361            wal.inner.block_cache().blocks_sorted(),
362            [committed_notification_1_cache_blocks]
363        );
364        assert_eq!(
365            wal.inner.block_cache().committed_blocks_sorted(),
366            committed_notification_1_cache_committed_blocks
367        );
368        assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
369
370        // Second notification (revert block 1)
371        wal.commit(&reverted_notification)?;
372        let file_id = 1;
373        let reverted_notification_cache_blocks = (blocks[1].number, file_id);
374        assert_eq!(
375            wal.inner.block_cache().blocks_sorted(),
376            [reverted_notification_cache_blocks, committed_notification_1_cache_blocks]
377        );
378        assert_eq!(
379            wal.inner.block_cache().committed_blocks_sorted(),
380            committed_notification_1_cache_committed_blocks
381        );
382        assert_eq!(
383            read_notifications(&wal)?,
384            vec![committed_notification_1.clone(), reverted_notification.clone()]
385        );
386
387        // Third notification (commit block 1, 2)
388        wal.commit(&committed_notification_2)?;
389        let file_id = 2;
390        let committed_notification_2_cache_blocks = (blocks[2].number, file_id);
391        let committed_notification_2_cache_committed_blocks = vec![
392            (
393                block_1_reorged.hash(),
394                file_id,
395                CachedBlock {
396                    block: (block_1_reorged.number, block_1_reorged.hash()).into(),
397                    parent_hash: block_1_reorged.parent_hash,
398                },
399            ),
400            (
401                blocks[2].hash(),
402                file_id,
403                CachedBlock {
404                    block: (blocks[2].number, blocks[2].hash()).into(),
405                    parent_hash: blocks[2].parent_hash,
406                },
407            ),
408        ];
409        assert_eq!(
410            wal.inner.block_cache().blocks_sorted(),
411            [
412                committed_notification_2_cache_blocks,
413                reverted_notification_cache_blocks,
414                committed_notification_1_cache_blocks,
415            ]
416        );
417        assert_eq!(
418            wal.inner.block_cache().committed_blocks_sorted(),
419            sort_committed_blocks(
420                [
421                    committed_notification_1_cache_committed_blocks.clone(),
422                    committed_notification_2_cache_committed_blocks.clone()
423                ]
424                .concat()
425            )
426        );
427        assert_eq!(
428            read_notifications(&wal)?,
429            vec![
430                committed_notification_1.clone(),
431                reverted_notification.clone(),
432                committed_notification_2.clone()
433            ]
434        );
435
436        // Fourth notification (revert block 2, commit block 2, 3)
437        wal.commit(&reorged_notification)?;
438        let file_id = 3;
439        let reorged_notification_cache_blocks = (blocks[3].number, file_id);
440        let reorged_notification_cache_committed_blocks = vec![
441            (
442                block_2_reorged.hash(),
443                file_id,
444                CachedBlock {
445                    block: (block_2_reorged.number, block_2_reorged.hash()).into(),
446                    parent_hash: block_2_reorged.parent_hash,
447                },
448            ),
449            (
450                blocks[3].hash(),
451                file_id,
452                CachedBlock {
453                    block: (blocks[3].number, blocks[3].hash()).into(),
454                    parent_hash: blocks[3].parent_hash,
455                },
456            ),
457        ];
458        assert_eq!(
459            wal.inner.block_cache().blocks_sorted(),
460            [
461                reorged_notification_cache_blocks,
462                committed_notification_2_cache_blocks,
463                reverted_notification_cache_blocks,
464                committed_notification_1_cache_blocks,
465            ]
466        );
467        assert_eq!(
468            wal.inner.block_cache().committed_blocks_sorted(),
469            sort_committed_blocks(
470                [
471                    committed_notification_1_cache_committed_blocks,
472                    committed_notification_2_cache_committed_blocks.clone(),
473                    reorged_notification_cache_committed_blocks.clone()
474                ]
475                .concat()
476            )
477        );
478        assert_eq!(
479            read_notifications(&wal)?,
480            vec![
481                committed_notification_1,
482                reverted_notification,
483                committed_notification_2.clone(),
484                reorged_notification.clone()
485            ]
486        );
487
488        // Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also
489        // had block 2 committed. In this case, we can't split the notification into two parts, so
490        // we preserve the whole notification in both the block cache and the storage, and delete
491        // the notifications before it.
492        wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
493        assert_eq!(
494            wal.inner.block_cache().blocks_sorted(),
495            [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
496        );
497        assert_eq!(
498            wal.inner.block_cache().committed_blocks_sorted(),
499            sort_committed_blocks(
500                [
501                    committed_notification_2_cache_committed_blocks.clone(),
502                    reorged_notification_cache_committed_blocks.clone()
503                ]
504                .concat()
505            )
506        );
507        assert_eq!(
508            read_notifications(&wal)?,
509            vec![committed_notification_2.clone(), reorged_notification.clone()]
510        );
511
512        // Re-open the WAL and verify that the cache population works correctly
513        let wal = Wal::new(&temp_dir)?;
514        assert_eq!(
515            wal.inner.block_cache().blocks_sorted(),
516            [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
517        );
518        assert_eq!(
519            wal.inner.block_cache().committed_blocks_sorted(),
520            sort_committed_blocks(
521                [
522                    committed_notification_2_cache_committed_blocks,
523                    reorged_notification_cache_committed_blocks
524                ]
525                .concat()
526            )
527        );
528        assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
529
530        Ok(())
531    }
532}