reth_beacon_consensus/engine/hooks/
static_file.rs

1//! `StaticFile` hook for the engine implementation.
2
3use crate::{
4    engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
5    hooks::EngineHookDBAccessLevel,
6};
7use alloy_primitives::BlockNumber;
8use futures::FutureExt;
9use reth_codecs::Compact;
10use reth_db_api::table::Value;
11use reth_errors::RethResult;
12use reth_primitives::{static_file::HighestStaticFiles, NodePrimitives};
13use reth_provider::{
14    BlockReader, ChainStateBlockReader, DatabaseProviderFactory, StageCheckpointReader,
15    StaticFileProviderFactory,
16};
17use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
18use reth_tasks::TaskSpawner;
19use std::task::{ready, Context, Poll};
20use tokio::sync::oneshot;
21use tracing::trace;
22
23/// Manages producing static files under the control of the engine.
24///
25/// This type controls the [`StaticFileProducer`].
26#[derive(Debug)]
27pub struct StaticFileHook<Provider> {
28    /// The current state of the `static_file_producer`.
29    state: StaticFileProducerState<Provider>,
30    /// The type that can spawn the `static_file_producer` task.
31    task_spawner: Box<dyn TaskSpawner>,
32}
33
34impl<Provider> StaticFileHook<Provider>
35where
36    Provider: StaticFileProviderFactory
37        + DatabaseProviderFactory<
38            Provider: StaticFileProviderFactory<
39                Primitives: NodePrimitives<
40                    SignedTx: Value + Compact,
41                    BlockHeader: Value + Compact,
42                    Receipt: Value + Compact,
43                >,
44            > + StageCheckpointReader
45                          + BlockReader
46                          + ChainStateBlockReader,
47        > + 'static,
48{
49    /// Create a new instance
50    pub fn new(
51        static_file_producer: StaticFileProducer<Provider>,
52        task_spawner: Box<dyn TaskSpawner>,
53    ) -> Self {
54        Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
55    }
56
57    /// Advances the `static_file_producer` state.
58    ///
59    /// This checks for the result in the channel, or returns pending if the `static_file_producer`
60    /// is idle.
61    fn poll_static_file_producer(
62        &mut self,
63        cx: &mut Context<'_>,
64    ) -> Poll<RethResult<EngineHookEvent>> {
65        let result = match self.state {
66            StaticFileProducerState::Idle(_) => return Poll::Pending,
67            StaticFileProducerState::Running(ref mut fut) => {
68                ready!(fut.poll_unpin(cx))
69            }
70        };
71
72        let event = match result {
73            Ok((static_file_producer, result)) => {
74                self.state = StaticFileProducerState::Idle(Some(static_file_producer));
75
76                match result {
77                    Ok(_) => EngineHookEvent::Finished(Ok(())),
78                    Err(err) => EngineHookEvent::Finished(Err(EngineHookError::Common(err.into()))),
79                }
80            }
81            Err(_) => {
82                // failed to receive the static_file_producer
83                EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
84            }
85        };
86
87        Poll::Ready(Ok(event))
88    }
89
90    /// This will try to spawn the `static_file_producer` if it is idle:
91    /// 1. Check if producing static files is needed through
92    ///    [`StaticFileProducer::get_static_file_targets`](reth_static_file::StaticFileProducerInner::get_static_file_targets)
93    ///    and then [`StaticFileTargets::any`](reth_static_file::StaticFileTargets::any).
94    ///
95    /// 2.1. If producing static files is needed, pass static file request to the
96    ///      [`StaticFileProducer::run`](reth_static_file::StaticFileProducerInner::run) and
97    ///      spawn it in a separate task. Set static file producer state to
98    ///      [`StaticFileProducerState::Running`].
99    /// 2.2. If producing static files is not needed, set static file producer state back to
100    ///      [`StaticFileProducerState::Idle`].
101    ///
102    /// If `static_file_producer` is already running, do nothing.
103    fn try_spawn_static_file_producer(
104        &mut self,
105        finalized_block_number: BlockNumber,
106    ) -> RethResult<Option<EngineHookEvent>> {
107        Ok(match &mut self.state {
108            StaticFileProducerState::Idle(static_file_producer) => {
109                let Some(static_file_producer) = static_file_producer.take() else {
110                    trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
111                    return Ok(None)
112                };
113
114                let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else {
115                    trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
116                    return Ok(None)
117                };
118
119                let finalized_block_number = locked_static_file_producer
120                    .last_finalized_block()?
121                    .map(|on_disk| finalized_block_number.min(on_disk))
122                    .unwrap_or(finalized_block_number);
123
124                let targets =
125                    locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
126                        headers: Some(finalized_block_number),
127                        receipts: Some(finalized_block_number),
128                        transactions: Some(finalized_block_number),
129                    })?;
130
131                // Check if the moving data to static files has been requested.
132                if targets.any() {
133                    let (tx, rx) = oneshot::channel();
134                    self.task_spawner.spawn_critical_blocking(
135                        "static_file_producer task",
136                        Box::pin(async move {
137                            let result = locked_static_file_producer.run(targets);
138                            let _ = tx.send((static_file_producer, result));
139                        }),
140                    );
141                    self.state = StaticFileProducerState::Running(rx);
142
143                    Some(EngineHookEvent::Started)
144                } else {
145                    self.state = StaticFileProducerState::Idle(Some(static_file_producer));
146                    Some(EngineHookEvent::NotReady)
147                }
148            }
149            StaticFileProducerState::Running(_) => None,
150        })
151    }
152}
153
154impl<Provider> EngineHook for StaticFileHook<Provider>
155where
156    Provider: StaticFileProviderFactory
157        + DatabaseProviderFactory<
158            Provider: StaticFileProviderFactory<
159                Primitives: NodePrimitives<
160                    SignedTx: Value + Compact,
161                    BlockHeader: Value + Compact,
162                    Receipt: Value + Compact,
163                >,
164            > + StageCheckpointReader
165                          + BlockReader
166                          + ChainStateBlockReader,
167        > + 'static,
168{
169    fn name(&self) -> &'static str {
170        "StaticFile"
171    }
172
173    fn poll(
174        &mut self,
175        cx: &mut Context<'_>,
176        ctx: EngineHookContext,
177    ) -> Poll<RethResult<EngineHookEvent>> {
178        let Some(finalized_block_number) = ctx.finalized_block_number else {
179            trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is not available");
180            return Poll::Pending
181        };
182
183        // Try to spawn a static_file_producer
184        match self.try_spawn_static_file_producer(finalized_block_number)? {
185            Some(EngineHookEvent::NotReady) => return Poll::Pending,
186            Some(event) => return Poll::Ready(Ok(event)),
187            None => (),
188        }
189
190        // Poll static_file_producer and check its status
191        self.poll_static_file_producer(cx)
192    }
193
194    fn db_access_level(&self) -> EngineHookDBAccessLevel {
195        EngineHookDBAccessLevel::ReadOnly
196    }
197}
198
199/// The possible `static_file_producer` states within the sync controller.
200///
201/// [`StaticFileProducerState::Idle`] means that the static file producer is currently idle.
202/// [`StaticFileProducerState::Running`] means that the static file producer is currently running.
203#[derive(Debug)]
204enum StaticFileProducerState<Provider> {
205    /// [`StaticFileProducer`] is idle.
206    Idle(Option<StaticFileProducer<Provider>>),
207    /// [`StaticFileProducer`] is running and waiting for a response
208    Running(oneshot::Receiver<StaticFileProducerWithResult<Provider>>),
209}