reth_beacon_consensus/engine/hooks/
static_file.rs1use crate::{
4 engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
5 hooks::EngineHookDBAccessLevel,
6};
7use alloy_primitives::BlockNumber;
8use futures::FutureExt;
9use reth_codecs::Compact;
10use reth_db_api::table::Value;
11use reth_errors::RethResult;
12use reth_primitives::{static_file::HighestStaticFiles, NodePrimitives};
13use reth_provider::{
14 BlockReader, ChainStateBlockReader, DatabaseProviderFactory, StageCheckpointReader,
15 StaticFileProviderFactory,
16};
17use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
18use reth_tasks::TaskSpawner;
19use std::task::{ready, Context, Poll};
20use tokio::sync::oneshot;
21use tracing::trace;
22
23#[derive(Debug)]
27pub struct StaticFileHook<Provider> {
28 state: StaticFileProducerState<Provider>,
30 task_spawner: Box<dyn TaskSpawner>,
32}
33
34impl<Provider> StaticFileHook<Provider>
35where
36 Provider: StaticFileProviderFactory
37 + DatabaseProviderFactory<
38 Provider: StaticFileProviderFactory<
39 Primitives: NodePrimitives<
40 SignedTx: Value + Compact,
41 BlockHeader: Value + Compact,
42 Receipt: Value + Compact,
43 >,
44 > + StageCheckpointReader
45 + BlockReader
46 + ChainStateBlockReader,
47 > + 'static,
48{
49 pub fn new(
51 static_file_producer: StaticFileProducer<Provider>,
52 task_spawner: Box<dyn TaskSpawner>,
53 ) -> Self {
54 Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
55 }
56
57 fn poll_static_file_producer(
62 &mut self,
63 cx: &mut Context<'_>,
64 ) -> Poll<RethResult<EngineHookEvent>> {
65 let result = match self.state {
66 StaticFileProducerState::Idle(_) => return Poll::Pending,
67 StaticFileProducerState::Running(ref mut fut) => {
68 ready!(fut.poll_unpin(cx))
69 }
70 };
71
72 let event = match result {
73 Ok((static_file_producer, result)) => {
74 self.state = StaticFileProducerState::Idle(Some(static_file_producer));
75
76 match result {
77 Ok(_) => EngineHookEvent::Finished(Ok(())),
78 Err(err) => EngineHookEvent::Finished(Err(EngineHookError::Common(err.into()))),
79 }
80 }
81 Err(_) => {
82 EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
84 }
85 };
86
87 Poll::Ready(Ok(event))
88 }
89
90 fn try_spawn_static_file_producer(
104 &mut self,
105 finalized_block_number: BlockNumber,
106 ) -> RethResult<Option<EngineHookEvent>> {
107 Ok(match &mut self.state {
108 StaticFileProducerState::Idle(static_file_producer) => {
109 let Some(static_file_producer) = static_file_producer.take() else {
110 trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
111 return Ok(None)
112 };
113
114 let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else {
115 trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
116 return Ok(None)
117 };
118
119 let finalized_block_number = locked_static_file_producer
120 .last_finalized_block()?
121 .map(|on_disk| finalized_block_number.min(on_disk))
122 .unwrap_or(finalized_block_number);
123
124 let targets =
125 locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
126 headers: Some(finalized_block_number),
127 receipts: Some(finalized_block_number),
128 transactions: Some(finalized_block_number),
129 })?;
130
131 if targets.any() {
133 let (tx, rx) = oneshot::channel();
134 self.task_spawner.spawn_critical_blocking(
135 "static_file_producer task",
136 Box::pin(async move {
137 let result = locked_static_file_producer.run(targets);
138 let _ = tx.send((static_file_producer, result));
139 }),
140 );
141 self.state = StaticFileProducerState::Running(rx);
142
143 Some(EngineHookEvent::Started)
144 } else {
145 self.state = StaticFileProducerState::Idle(Some(static_file_producer));
146 Some(EngineHookEvent::NotReady)
147 }
148 }
149 StaticFileProducerState::Running(_) => None,
150 })
151 }
152}
153
154impl<Provider> EngineHook for StaticFileHook<Provider>
155where
156 Provider: StaticFileProviderFactory
157 + DatabaseProviderFactory<
158 Provider: StaticFileProviderFactory<
159 Primitives: NodePrimitives<
160 SignedTx: Value + Compact,
161 BlockHeader: Value + Compact,
162 Receipt: Value + Compact,
163 >,
164 > + StageCheckpointReader
165 + BlockReader
166 + ChainStateBlockReader,
167 > + 'static,
168{
169 fn name(&self) -> &'static str {
170 "StaticFile"
171 }
172
173 fn poll(
174 &mut self,
175 cx: &mut Context<'_>,
176 ctx: EngineHookContext,
177 ) -> Poll<RethResult<EngineHookEvent>> {
178 let Some(finalized_block_number) = ctx.finalized_block_number else {
179 trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is not available");
180 return Poll::Pending
181 };
182
183 match self.try_spawn_static_file_producer(finalized_block_number)? {
185 Some(EngineHookEvent::NotReady) => return Poll::Pending,
186 Some(event) => return Poll::Ready(Ok(event)),
187 None => (),
188 }
189
190 self.poll_static_file_producer(cx)
192 }
193
194 fn db_access_level(&self) -> EngineHookDBAccessLevel {
195 EngineHookDBAccessLevel::ReadOnly
196 }
197}
198
199#[derive(Debug)]
204enum StaticFileProducerState<Provider> {
205 Idle(Option<StaticFileProducer<Provider>>),
207 Running(oneshot::Receiver<StaticFileProducerWithResult<Provider>>),
209}