1#![allow(dead_code)]
2
3mod cache;
4pub use cache::BlockCache;
5mod storage;
6use reth_node_api::NodePrimitives;
7use reth_primitives::EthPrimitives;
8pub use storage::Storage;
9mod metrics;
10use metrics::Metrics;
11
12use std::{
13 path::Path,
14 sync::{
15 atomic::{AtomicU32, Ordering},
16 Arc,
17 },
18};
19
20use alloy_eips::BlockNumHash;
21use alloy_primitives::B256;
22use parking_lot::{RwLock, RwLockReadGuard};
23use reth_exex_types::ExExNotification;
24use reth_tracing::tracing::{debug, instrument};
25
26#[derive(Debug, Clone)]
37pub struct Wal<N: NodePrimitives = EthPrimitives> {
38 inner: Arc<WalInner<N>>,
39}
40
41impl<N> Wal<N>
42where
43 N: NodePrimitives,
44{
45 pub fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
47 Ok(Self { inner: Arc::new(WalInner::new(directory)?) })
48 }
49
50 pub fn handle(&self) -> WalHandle<N> {
52 WalHandle { wal: self.inner.clone() }
53 }
54
55 pub fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> {
57 self.inner.commit(notification)
58 }
59
60 pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
65 self.inner.finalize(to_block)
66 }
67
68 pub fn iter_notifications(
70 &self,
71 ) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> {
72 self.inner.iter_notifications()
73 }
74
75 pub fn num_blocks(&self) -> usize {
77 self.inner.block_cache().num_blocks()
78 }
79}
80
81#[derive(Debug)]
83struct WalInner<N: NodePrimitives> {
84 next_file_id: AtomicU32,
85 storage: Storage<N>,
87 block_cache: RwLock<BlockCache>,
89 metrics: Metrics,
90}
91
92impl<N> WalInner<N>
93where
94 N: NodePrimitives,
95{
96 fn new(directory: impl AsRef<Path>) -> eyre::Result<Self> {
97 let mut wal = Self {
98 next_file_id: AtomicU32::new(0),
99 storage: Storage::new(directory)?,
100 block_cache: RwLock::new(BlockCache::default()),
101 metrics: Metrics::default(),
102 };
103 wal.fill_block_cache()?;
104 Ok(wal)
105 }
106
107 fn block_cache(&self) -> RwLockReadGuard<'_, BlockCache> {
108 self.block_cache.read()
109 }
110
111 #[instrument(skip(self))]
113 fn fill_block_cache(&mut self) -> eyre::Result<()> {
114 let Some(files_range) = self.storage.files_range()? else { return Ok(()) };
115 self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
116
117 let mut block_cache = self.block_cache.write();
118 let mut notifications_size = 0;
119
120 for entry in self.storage.iter_notifications(files_range) {
121 let (file_id, size, notification) = entry?;
122
123 notifications_size += size;
124
125 let committed_chain = notification.committed_chain();
126 let reverted_chain = notification.reverted_chain();
127
128 debug!(
129 target: "exex::wal",
130 ?file_id,
131 reverted_block_range = ?reverted_chain.as_ref().map(|chain| chain.range()),
132 committed_block_range = ?committed_chain.as_ref().map(|chain| chain.range()),
133 "Inserting block cache entries"
134 );
135
136 block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification);
137 }
138
139 self.update_metrics(&block_cache, notifications_size as i64);
140
141 Ok(())
142 }
143
144 #[instrument(skip_all, fields(
145 reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
146 committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
147 ))]
148 fn commit(&self, notification: &ExExNotification<N>) -> eyre::Result<()> {
149 let mut block_cache = self.block_cache.write();
150
151 let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
152 let size = self.storage.write_notification(file_id, notification)?;
153
154 debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache");
155 block_cache.insert_notification_blocks_with_file_id(file_id, notification);
156
157 self.update_metrics(&block_cache, size as i64);
158
159 Ok(())
160 }
161
162 #[instrument(skip(self))]
163 fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
164 let mut block_cache = self.block_cache.write();
165 let file_ids = block_cache.remove_before(to_block.number);
166
167 if file_ids.is_empty() {
169 debug!(target: "exex::wal", "No notifications were finalized from the storage");
170 return Ok(())
171 }
172
173 let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
174 debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized");
175
176 self.update_metrics(&block_cache, -(removed_size as i64));
177
178 Ok(())
179 }
180
181 fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
182 self.metrics.size_bytes.increment(size_delta as f64);
183 self.metrics.notifications_count.set(block_cache.notification_max_blocks.len() as f64);
184 self.metrics.committed_blocks_count.set(block_cache.committed_blocks.len() as f64);
185
186 if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
187 self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
188 }
189
190 if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
191 self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
192 }
193 }
194
195 fn iter_notifications(
197 &self,
198 ) -> eyre::Result<Box<dyn Iterator<Item = eyre::Result<ExExNotification<N>>> + '_>> {
199 let Some(range) = self.storage.files_range()? else {
200 return Ok(Box::new(std::iter::empty()))
201 };
202
203 Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
204 }
205}
206
207#[derive(Debug)]
209pub struct WalHandle<N: NodePrimitives> {
210 wal: Arc<WalInner<N>>,
211}
212
213impl<N> WalHandle<N>
214where
215 N: NodePrimitives,
216{
217 pub fn get_committed_notification_by_block_hash(
219 &self,
220 block_hash: &B256,
221 ) -> eyre::Result<Option<ExExNotification<N>>> {
222 let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash)
223 else {
224 return Ok(None)
225 };
226
227 self.wal
228 .storage
229 .read_notification(file_id)
230 .map(|entry| entry.map(|(notification, _)| notification))
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use std::sync::Arc;
237
238 use alloy_primitives::B256;
239 use eyre::OptionExt;
240 use itertools::Itertools;
241 use reth_exex_types::ExExNotification;
242 use reth_provider::Chain;
243 use reth_testing_utils::generators::{
244 self, random_block, random_block_range, BlockParams, BlockRangeParams,
245 };
246
247 use crate::wal::{cache::CachedBlock, Wal};
248
249 fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
250 wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
251 wal.inner
252 .storage
253 .iter_notifications(range)
254 .map(|entry| entry.map(|(_, _, n)| n))
255 .collect()
256 })
257 }
258
259 fn sort_committed_blocks(
260 committed_blocks: Vec<(B256, u32, CachedBlock)>,
261 ) -> Vec<(B256, u32, CachedBlock)> {
262 committed_blocks
263 .into_iter()
264 .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
265 .collect()
266 }
267
268 #[test]
269 fn test_wal() -> eyre::Result<()> {
270 reth_tracing::init_test_tracing();
271
272 let mut rng = generators::rng();
273
274 let temp_dir = tempfile::tempdir()?;
276 let wal = Wal::new(&temp_dir)?;
277 assert!(wal.inner.block_cache().is_empty());
278
279 let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default())
281 .into_iter()
282 .map(|block| {
283 block
284 .seal_with_senders::<reth_primitives::Block>()
285 .ok_or_eyre("failed to recover senders")
286 })
287 .collect::<eyre::Result<Vec<_>>>()?;
288 let block_1_reorged = random_block(
289 &mut rng,
290 1,
291 BlockParams { parent: Some(blocks[0].hash()), ..Default::default() },
292 )
293 .seal_with_senders::<reth_primitives::Block>()
294 .ok_or_eyre("failed to recover senders")?;
295 let block_2_reorged = random_block(
296 &mut rng,
297 2,
298 BlockParams { parent: Some(blocks[1].hash()), ..Default::default() },
299 )
300 .seal_with_senders::<reth_primitives::Block>()
301 .ok_or_eyre("failed to recover senders")?;
302
303 let committed_notification_1 = ExExNotification::ChainCommitted {
310 new: Arc::new(Chain::new(
311 vec![blocks[0].clone(), blocks[1].clone()],
312 Default::default(),
313 None,
314 )),
315 };
316 let reverted_notification = ExExNotification::ChainReverted {
317 old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
318 };
319 let committed_notification_2 = ExExNotification::ChainCommitted {
320 new: Arc::new(Chain::new(
321 vec![block_1_reorged.clone(), blocks[2].clone()],
322 Default::default(),
323 None,
324 )),
325 };
326 let reorged_notification = ExExNotification::ChainReorged {
327 old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
328 new: Arc::new(Chain::new(
329 vec![block_2_reorged.clone(), blocks[3].clone()],
330 Default::default(),
331 None,
332 )),
333 };
334
335 let file_id = 0;
340 let committed_notification_1_cache_blocks = (blocks[1].number, file_id);
341 let committed_notification_1_cache_committed_blocks = vec![
342 (
343 blocks[0].hash(),
344 file_id,
345 CachedBlock {
346 block: (blocks[0].number, blocks[0].hash()).into(),
347 parent_hash: blocks[0].parent_hash,
348 },
349 ),
350 (
351 blocks[1].hash(),
352 file_id,
353 CachedBlock {
354 block: (blocks[1].number, blocks[1].hash()).into(),
355 parent_hash: blocks[1].parent_hash,
356 },
357 ),
358 ];
359 wal.commit(&committed_notification_1)?;
360 assert_eq!(
361 wal.inner.block_cache().blocks_sorted(),
362 [committed_notification_1_cache_blocks]
363 );
364 assert_eq!(
365 wal.inner.block_cache().committed_blocks_sorted(),
366 committed_notification_1_cache_committed_blocks
367 );
368 assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
369
370 wal.commit(&reverted_notification)?;
372 let file_id = 1;
373 let reverted_notification_cache_blocks = (blocks[1].number, file_id);
374 assert_eq!(
375 wal.inner.block_cache().blocks_sorted(),
376 [reverted_notification_cache_blocks, committed_notification_1_cache_blocks]
377 );
378 assert_eq!(
379 wal.inner.block_cache().committed_blocks_sorted(),
380 committed_notification_1_cache_committed_blocks
381 );
382 assert_eq!(
383 read_notifications(&wal)?,
384 vec![committed_notification_1.clone(), reverted_notification.clone()]
385 );
386
387 wal.commit(&committed_notification_2)?;
389 let file_id = 2;
390 let committed_notification_2_cache_blocks = (blocks[2].number, file_id);
391 let committed_notification_2_cache_committed_blocks = vec![
392 (
393 block_1_reorged.hash(),
394 file_id,
395 CachedBlock {
396 block: (block_1_reorged.number, block_1_reorged.hash()).into(),
397 parent_hash: block_1_reorged.parent_hash,
398 },
399 ),
400 (
401 blocks[2].hash(),
402 file_id,
403 CachedBlock {
404 block: (blocks[2].number, blocks[2].hash()).into(),
405 parent_hash: blocks[2].parent_hash,
406 },
407 ),
408 ];
409 assert_eq!(
410 wal.inner.block_cache().blocks_sorted(),
411 [
412 committed_notification_2_cache_blocks,
413 reverted_notification_cache_blocks,
414 committed_notification_1_cache_blocks,
415 ]
416 );
417 assert_eq!(
418 wal.inner.block_cache().committed_blocks_sorted(),
419 sort_committed_blocks(
420 [
421 committed_notification_1_cache_committed_blocks.clone(),
422 committed_notification_2_cache_committed_blocks.clone()
423 ]
424 .concat()
425 )
426 );
427 assert_eq!(
428 read_notifications(&wal)?,
429 vec![
430 committed_notification_1.clone(),
431 reverted_notification.clone(),
432 committed_notification_2.clone()
433 ]
434 );
435
436 wal.commit(&reorged_notification)?;
438 let file_id = 3;
439 let reorged_notification_cache_blocks = (blocks[3].number, file_id);
440 let reorged_notification_cache_committed_blocks = vec![
441 (
442 block_2_reorged.hash(),
443 file_id,
444 CachedBlock {
445 block: (block_2_reorged.number, block_2_reorged.hash()).into(),
446 parent_hash: block_2_reorged.parent_hash,
447 },
448 ),
449 (
450 blocks[3].hash(),
451 file_id,
452 CachedBlock {
453 block: (blocks[3].number, blocks[3].hash()).into(),
454 parent_hash: blocks[3].parent_hash,
455 },
456 ),
457 ];
458 assert_eq!(
459 wal.inner.block_cache().blocks_sorted(),
460 [
461 reorged_notification_cache_blocks,
462 committed_notification_2_cache_blocks,
463 reverted_notification_cache_blocks,
464 committed_notification_1_cache_blocks,
465 ]
466 );
467 assert_eq!(
468 wal.inner.block_cache().committed_blocks_sorted(),
469 sort_committed_blocks(
470 [
471 committed_notification_1_cache_committed_blocks,
472 committed_notification_2_cache_committed_blocks.clone(),
473 reorged_notification_cache_committed_blocks.clone()
474 ]
475 .concat()
476 )
477 );
478 assert_eq!(
479 read_notifications(&wal)?,
480 vec![
481 committed_notification_1,
482 reverted_notification,
483 committed_notification_2.clone(),
484 reorged_notification.clone()
485 ]
486 );
487
488 wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?;
493 assert_eq!(
494 wal.inner.block_cache().blocks_sorted(),
495 [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
496 );
497 assert_eq!(
498 wal.inner.block_cache().committed_blocks_sorted(),
499 sort_committed_blocks(
500 [
501 committed_notification_2_cache_committed_blocks.clone(),
502 reorged_notification_cache_committed_blocks.clone()
503 ]
504 .concat()
505 )
506 );
507 assert_eq!(
508 read_notifications(&wal)?,
509 vec![committed_notification_2.clone(), reorged_notification.clone()]
510 );
511
512 let wal = Wal::new(&temp_dir)?;
514 assert_eq!(
515 wal.inner.block_cache().blocks_sorted(),
516 [reorged_notification_cache_blocks, committed_notification_2_cache_blocks]
517 );
518 assert_eq!(
519 wal.inner.block_cache().committed_blocks_sorted(),
520 sort_committed_blocks(
521 [
522 committed_notification_2_cache_committed_blocks,
523 reorged_notification_cache_committed_blocks
524 ]
525 .concat()
526 )
527 );
528 assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
529
530 Ok(())
531 }
532}