reth_engine_util/
engine_store.rs1use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState};
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
6use reth_fs_util as fs;
7use serde::{Deserialize, Serialize};
8use std::{
9 collections::BTreeMap,
10 path::PathBuf,
11 pin::Pin,
12 task::{ready, Context, Poll},
13 time::SystemTime,
14};
15use tracing::*;
16
17#[derive(Debug, Serialize, Deserialize)]
19#[serde(rename_all = "camelCase")]
20pub enum StoredEngineApiMessage<Attributes> {
21 ForkchoiceUpdated {
23 state: ForkchoiceState,
25 payload_attrs: Option<Attributes>,
27 },
28 NewPayload {
30 payload: ExecutionPayload,
32 sidecar: ExecutionPayloadSidecar,
35 },
36}
37
38#[derive(Debug)]
40pub struct EngineMessageStore {
41 path: PathBuf,
43}
44
45impl EngineMessageStore {
46 pub const fn new(path: PathBuf) -> Self {
50 Self { path }
51 }
52
53 pub fn on_message<Engine>(
56 &self,
57 msg: &BeaconEngineMessage<Engine>,
58 received_at: SystemTime,
59 ) -> eyre::Result<()>
60 where
61 Engine: EngineTypes,
62 {
63 fs::create_dir_all(&self.path)?; let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
65 match msg {
66 BeaconEngineMessage::ForkchoiceUpdated {
67 state,
68 payload_attrs,
69 tx: _tx,
70 version: _version,
71 } => {
72 let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
73 fs::write(
74 self.path.join(filename),
75 serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated {
76 state: *state,
77 payload_attrs: payload_attrs.clone(),
78 })?,
79 )?;
80 }
81 BeaconEngineMessage::NewPayload { payload, sidecar, tx: _tx } => {
82 let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
83 fs::write(
84 self.path.join(filename),
85 serde_json::to_vec(
86 &StoredEngineApiMessage::<Engine::PayloadAttributes>::NewPayload {
87 payload: payload.clone(),
88 sidecar: sidecar.clone(),
89 },
90 )?,
91 )?;
92 }
93 BeaconEngineMessage::TransitionConfigurationExchanged => (),
95 };
96 Ok(())
97 }
98
99 pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
101 let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
102 for entry in fs::read_dir(&self.path)? {
103 let entry = entry?;
104 let filename = entry.file_name();
105 if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
106 if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
107 filenames_by_ts.entry(timestamp).or_default().push(entry.path());
108 tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
109 } else {
110 tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
111 }
112 } else {
113 tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
114 }
115 }
116 Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
117 }
118}
119
120#[derive(Debug)]
123#[pin_project::pin_project]
124pub struct EngineStoreStream<S> {
125 #[pin]
127 stream: S,
128 store: EngineMessageStore,
130}
131
132impl<S> EngineStoreStream<S> {
133 pub const fn new(stream: S, path: PathBuf) -> Self {
135 Self { stream, store: EngineMessageStore::new(path) }
136 }
137}
138
139impl<S, Engine> Stream for EngineStoreStream<S>
140where
141 S: Stream<Item = BeaconEngineMessage<Engine>>,
142 Engine: EngineTypes,
143{
144 type Item = S::Item;
145
146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147 let mut this = self.project();
148 let next = ready!(this.stream.poll_next_unpin(cx));
149 if let Some(msg) = &next {
150 if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
151 error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
152 }
153 }
154 Poll::Ready(next)
155 }
156}