reth_engine_util/
engine_store.rs

1//! Stores engine API messages to disk for later inspection and replay.
2
3use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState};
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
6use reth_fs_util as fs;
7use serde::{Deserialize, Serialize};
8use std::{
9    collections::BTreeMap,
10    path::PathBuf,
11    pin::Pin,
12    task::{ready, Context, Poll},
13    time::SystemTime,
14};
15use tracing::*;
16
17/// A message from the engine API that has been stored to disk.
18#[derive(Debug, Serialize, Deserialize)]
19#[serde(rename_all = "camelCase")]
20pub enum StoredEngineApiMessage<Attributes> {
21    /// The on-disk representation of an `engine_forkchoiceUpdated` method call.
22    ForkchoiceUpdated {
23        /// The [`ForkchoiceState`] sent in the persisted call.
24        state: ForkchoiceState,
25        /// The payload attributes sent in the persisted call, if any.
26        payload_attrs: Option<Attributes>,
27    },
28    /// The on-disk representation of an `engine_newPayload` method call.
29    NewPayload {
30        /// The [`ExecutionPayload`] sent in the persisted call.
31        payload: ExecutionPayload,
32        /// The execution payload sidecar with additional version-specific fields received by
33        /// engine API.
34        sidecar: ExecutionPayloadSidecar,
35    },
36}
37
38/// This can read and write engine API messages in a specific directory.
39#[derive(Debug)]
40pub struct EngineMessageStore {
41    /// The path to the directory that stores the engine API messages.
42    path: PathBuf,
43}
44
45impl EngineMessageStore {
46    /// Creates a new [`EngineMessageStore`] at the given path.
47    ///
48    /// The path is expected to be a directory, where individual message JSON files will be stored.
49    pub const fn new(path: PathBuf) -> Self {
50        Self { path }
51    }
52
53    /// Stores the received [`BeaconEngineMessage`] to disk, appending the `received_at` time to the
54    /// path.
55    pub fn on_message<Engine>(
56        &self,
57        msg: &BeaconEngineMessage<Engine>,
58        received_at: SystemTime,
59    ) -> eyre::Result<()>
60    where
61        Engine: EngineTypes,
62    {
63        fs::create_dir_all(&self.path)?; // ensure that store path had been created
64        let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
65        match msg {
66            BeaconEngineMessage::ForkchoiceUpdated {
67                state,
68                payload_attrs,
69                tx: _tx,
70                version: _version,
71            } => {
72                let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
73                fs::write(
74                    self.path.join(filename),
75                    serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated {
76                        state: *state,
77                        payload_attrs: payload_attrs.clone(),
78                    })?,
79                )?;
80            }
81            BeaconEngineMessage::NewPayload { payload, sidecar, tx: _tx } => {
82                let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
83                fs::write(
84                    self.path.join(filename),
85                    serde_json::to_vec(
86                        &StoredEngineApiMessage::<Engine::PayloadAttributes>::NewPayload {
87                            payload: payload.clone(),
88                            sidecar: sidecar.clone(),
89                        },
90                    )?,
91                )?;
92            }
93            // noop
94            BeaconEngineMessage::TransitionConfigurationExchanged => (),
95        };
96        Ok(())
97    }
98
99    /// Finds and iterates through any stored engine API message files, ordered by timestamp.
100    pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
101        let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
102        for entry in fs::read_dir(&self.path)? {
103            let entry = entry?;
104            let filename = entry.file_name();
105            if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
106                if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
107                    filenames_by_ts.entry(timestamp).or_default().push(entry.path());
108                    tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
109                } else {
110                    tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
111                }
112            } else {
113                tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
114            }
115        }
116        Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
117    }
118}
119
120/// A wrapper stream that stores Engine API messages in
121/// the specified directory.
122#[derive(Debug)]
123#[pin_project::pin_project]
124pub struct EngineStoreStream<S> {
125    /// Inner message stream.
126    #[pin]
127    stream: S,
128    /// Engine message store.
129    store: EngineMessageStore,
130}
131
132impl<S> EngineStoreStream<S> {
133    /// Create new engine store stream wrapper.
134    pub const fn new(stream: S, path: PathBuf) -> Self {
135        Self { stream, store: EngineMessageStore::new(path) }
136    }
137}
138
139impl<S, Engine> Stream for EngineStoreStream<S>
140where
141    S: Stream<Item = BeaconEngineMessage<Engine>>,
142    Engine: EngineTypes,
143{
144    type Item = S::Item;
145
146    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147        let mut this = self.project();
148        let next = ready!(this.stream.poll_next_unpin(cx));
149        if let Some(msg) = &next {
150            if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
151                error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
152            }
153        }
154        Poll::Ready(next)
155    }
156}