reth_basic_payload_builder/
lib.rs

1//! A basic payload generator for reth.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use crate::metrics::PayloadBuilderMetrics;
12use alloy_consensus::constants::EMPTY_WITHDRAWALS;
13use alloy_eips::{eip4895::Withdrawals, merge::SLOT_DURATION};
14use alloy_primitives::{Bytes, B256, U256};
15use futures_core::ready;
16use futures_util::FutureExt;
17use reth_chainspec::EthereumHardforks;
18use reth_evm::state_change::post_block_withdrawals_balance_increments;
19use reth_payload_builder::{KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator};
20use reth_payload_builder_primitives::PayloadBuilderError;
21use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind};
22use reth_primitives::{proofs, SealedHeader};
23use reth_primitives_traits::constants::RETH_CLIENT_VERSION;
24use reth_provider::{BlockReaderIdExt, CanonStateNotification, StateProviderFactory};
25use reth_revm::cached::CachedReads;
26use reth_tasks::TaskSpawner;
27use reth_transaction_pool::TransactionPool;
28use revm::{Database, State};
29use std::{
30    fmt,
31    future::Future,
32    ops::Deref,
33    pin::Pin,
34    sync::{atomic::AtomicBool, Arc},
35    task::{Context, Poll},
36    time::{Duration, SystemTime, UNIX_EPOCH},
37};
38use tokio::{
39    sync::{oneshot, Semaphore},
40    time::{Interval, Sleep},
41};
42use tracing::{debug, trace, warn};
43
44mod metrics;
45mod stack;
46
47pub use stack::PayloadBuilderStack;
48
49/// The [`PayloadJobGenerator`] that creates [`BasicPayloadJob`]s.
50#[derive(Debug)]
51pub struct BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
52    /// The client that can interact with the chain.
53    client: Client,
54    /// The transaction pool to pull transactions from.
55    pool: Pool,
56    /// The task executor to spawn payload building tasks on.
57    executor: Tasks,
58    /// The configuration for the job generator.
59    config: BasicPayloadJobGeneratorConfig,
60    /// Restricts how many generator tasks can be executed at once.
61    payload_task_guard: PayloadTaskGuard,
62    /// The type responsible for building payloads.
63    ///
64    /// See [`PayloadBuilder`]
65    builder: Builder,
66    /// Stored `cached_reads` for new payload jobs.
67    pre_cached: Option<PrecachedState>,
68}
69
70// === impl BasicPayloadJobGenerator ===
71
72impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
73    /// Creates a new [`BasicPayloadJobGenerator`] with the given config and custom
74    /// [`PayloadBuilder`]
75    pub fn with_builder(
76        client: Client,
77        pool: Pool,
78        executor: Tasks,
79        config: BasicPayloadJobGeneratorConfig,
80        builder: Builder,
81    ) -> Self {
82        Self {
83            client,
84            pool,
85            executor,
86            payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks),
87            config,
88            builder,
89            pre_cached: None,
90        }
91    }
92
93    /// Returns the maximum duration a job should be allowed to run.
94    ///
95    /// This adheres to the following specification:
96    /// > Client software SHOULD stop the updating process when either a call to engine_getPayload
97    /// > with the build process's payloadId is made or SECONDS_PER_SLOT (12s in the Mainnet
98    /// > configuration) have passed since the point in time identified by the timestamp parameter.
99    ///
100    /// See also <https://github.com/ethereum/execution-apis/blob/431cf72fd3403d946ca3e3afc36b973fc87e0e89/src/engine/paris.md?plain=1#L137>
101    #[inline]
102    fn max_job_duration(&self, unix_timestamp: u64) -> Duration {
103        let duration_until_timestamp = duration_until(unix_timestamp);
104
105        // safety in case clocks are bad
106        let duration_until_timestamp = duration_until_timestamp.min(self.config.deadline * 3);
107
108        self.config.deadline + duration_until_timestamp
109    }
110
111    /// Returns the [Instant](tokio::time::Instant) at which the job should be terminated because it
112    /// is considered timed out.
113    #[inline]
114    fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
115        tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
116    }
117
118    /// Returns a reference to the tasks type
119    pub const fn tasks(&self) -> &Tasks {
120        &self.executor
121    }
122
123    /// Returns the pre-cached reads for the given parent header if it matches the cached state's
124    /// block.
125    fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
126        self.pre_cached.as_ref().filter(|pc| pc.block == parent).map(|pc| pc.cached.clone())
127    }
128}
129
130// === impl BasicPayloadJobGenerator ===
131
132impl<Client, Pool, Tasks, Builder> PayloadJobGenerator
133    for BasicPayloadJobGenerator<Client, Pool, Tasks, Builder>
134where
135    Client: StateProviderFactory
136        + BlockReaderIdExt<Header = alloy_consensus::Header>
137        + Clone
138        + Unpin
139        + 'static,
140    Pool: TransactionPool + Unpin + 'static,
141    Tasks: TaskSpawner + Clone + Unpin + 'static,
142    Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
143    <Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
144    <Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
145{
146    type Job = BasicPayloadJob<Client, Pool, Tasks, Builder>;
147
148    fn new_payload_job(
149        &self,
150        attributes: <Self::Job as PayloadJob>::PayloadAttributes,
151    ) -> Result<Self::Job, PayloadBuilderError> {
152        let parent_header = if attributes.parent().is_zero() {
153            // Use latest header for genesis block case
154            self.client
155                .latest_header()
156                .map_err(PayloadBuilderError::from)?
157                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(B256::ZERO))?
158        } else {
159            // Fetch specific header by hash
160            self.client
161                .sealed_header_by_hash(attributes.parent())
162                .map_err(PayloadBuilderError::from)?
163                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(attributes.parent()))?
164        };
165
166        let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes);
167
168        let until = self.job_deadline(config.attributes.timestamp());
169        let deadline = Box::pin(tokio::time::sleep_until(until));
170
171        let cached_reads = self.maybe_pre_cached(parent_header.hash());
172
173        let mut job = BasicPayloadJob {
174            config,
175            client: self.client.clone(),
176            pool: self.pool.clone(),
177            executor: self.executor.clone(),
178            deadline,
179            // ticks immediately
180            interval: tokio::time::interval(self.config.interval),
181            best_payload: PayloadState::Missing,
182            pending_block: None,
183            cached_reads,
184            payload_task_guard: self.payload_task_guard.clone(),
185            metrics: Default::default(),
186            builder: self.builder.clone(),
187        };
188
189        // start the first job right away
190        job.spawn_build_job();
191
192        Ok(job)
193    }
194
195    fn on_new_state(&mut self, new_state: CanonStateNotification) {
196        let mut cached = CachedReads::default();
197
198        // extract the state from the notification and put it into the cache
199        let committed = new_state.committed();
200        let new_execution_outcome = committed.execution_outcome();
201        for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
202            if let Some(info) = acc.info.clone() {
203                // we want pre cache existing accounts and their storage
204                // this only includes changed accounts and storage but is better than nothing
205                let storage =
206                    acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
207                cached.insert_account(addr, info, storage);
208            }
209        }
210
211        self.pre_cached = Some(PrecachedState { block: committed.tip().hash(), cached });
212    }
213}
214
215/// Pre-filled [`CachedReads`] for a specific block.
216///
217/// This is extracted from the [`CanonStateNotification`] for the tip block.
218#[derive(Debug, Clone)]
219pub struct PrecachedState {
220    /// The block for which the state is pre-cached.
221    pub block: B256,
222    /// Cached state for the block.
223    pub cached: CachedReads,
224}
225
226/// Restricts how many generator tasks can be executed at once.
227#[derive(Debug, Clone)]
228pub struct PayloadTaskGuard(Arc<Semaphore>);
229
230impl Deref for PayloadTaskGuard {
231    type Target = Semaphore;
232
233    fn deref(&self) -> &Self::Target {
234        &self.0
235    }
236}
237
238// === impl PayloadTaskGuard ===
239
240impl PayloadTaskGuard {
241    /// Constructs `Self` with a maximum task count of `max_payload_tasks`.
242    pub fn new(max_payload_tasks: usize) -> Self {
243        Self(Arc::new(Semaphore::new(max_payload_tasks)))
244    }
245}
246
247/// Settings for the [`BasicPayloadJobGenerator`].
248#[derive(Debug, Clone)]
249pub struct BasicPayloadJobGeneratorConfig {
250    /// Data to include in the block's extra data field.
251    extradata: Bytes,
252    /// The interval at which the job should build a new payload after the last.
253    interval: Duration,
254    /// The deadline for when the payload builder job should resolve.
255    ///
256    /// By default this is [`SLOT_DURATION`]: 12s
257    deadline: Duration,
258    /// Maximum number of tasks to spawn for building a payload.
259    max_payload_tasks: usize,
260}
261
262// === impl BasicPayloadJobGeneratorConfig ===
263
264impl BasicPayloadJobGeneratorConfig {
265    /// Sets the interval at which the job should build a new payload after the last.
266    pub const fn interval(mut self, interval: Duration) -> Self {
267        self.interval = interval;
268        self
269    }
270
271    /// Sets the deadline when this job should resolve.
272    pub const fn deadline(mut self, deadline: Duration) -> Self {
273        self.deadline = deadline;
274        self
275    }
276
277    /// Sets the maximum number of tasks to spawn for building a payload(s).
278    ///
279    /// # Panics
280    ///
281    /// If `max_payload_tasks` is 0.
282    pub fn max_payload_tasks(mut self, max_payload_tasks: usize) -> Self {
283        assert!(max_payload_tasks > 0, "max_payload_tasks must be greater than 0");
284        self.max_payload_tasks = max_payload_tasks;
285        self
286    }
287
288    /// Sets the data to include in the block's extra data field.
289    ///
290    /// Defaults to the current client version: `rlp(RETH_CLIENT_VERSION)`.
291    pub fn extradata(mut self, extradata: Bytes) -> Self {
292        self.extradata = extradata;
293        self
294    }
295}
296
297impl Default for BasicPayloadJobGeneratorConfig {
298    fn default() -> Self {
299        Self {
300            extradata: alloy_rlp::encode(RETH_CLIENT_VERSION.as_bytes()).into(),
301            interval: Duration::from_secs(1),
302            // 12s slot time
303            deadline: SLOT_DURATION,
304            max_payload_tasks: 3,
305        }
306    }
307}
308
309/// A basic payload job that continuously builds a payload with the best transactions from the pool.
310#[derive(Debug)]
311pub struct BasicPayloadJob<Client, Pool, Tasks, Builder>
312where
313    Builder: PayloadBuilder<Pool, Client>,
314{
315    /// The configuration for how the payload will be created.
316    config: PayloadConfig<Builder::Attributes>,
317    /// The client that can interact with the chain.
318    client: Client,
319    /// The transaction pool.
320    pool: Pool,
321    /// How to spawn building tasks
322    executor: Tasks,
323    /// The deadline when this job should resolve.
324    deadline: Pin<Box<Sleep>>,
325    /// The interval at which the job should build a new payload after the last.
326    interval: Interval,
327    /// The best payload so far and its state.
328    best_payload: PayloadState<Builder::BuiltPayload>,
329    /// Receiver for the block that is currently being built.
330    pending_block: Option<PendingPayload<Builder::BuiltPayload>>,
331    /// Restricts how many generator tasks can be executed at once.
332    payload_task_guard: PayloadTaskGuard,
333    /// Caches all disk reads for the state the new payloads builds on
334    ///
335    /// This is used to avoid reading the same state over and over again when new attempts are
336    /// triggered, because during the building process we'll repeatedly execute the transactions.
337    cached_reads: Option<CachedReads>,
338    /// metrics for this type
339    metrics: PayloadBuilderMetrics,
340    /// The type responsible for building payloads.
341    ///
342    /// See [`PayloadBuilder`]
343    builder: Builder,
344}
345
346impl<Client, Pool, Tasks, Builder> BasicPayloadJob<Client, Pool, Tasks, Builder>
347where
348    Client: StateProviderFactory + Clone + Unpin + 'static,
349    Pool: TransactionPool + Unpin + 'static,
350    Tasks: TaskSpawner + Clone + 'static,
351    Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
352    <Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
353    <Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
354{
355    /// Spawns a new payload build task.
356    fn spawn_build_job(&mut self) {
357        trace!(target: "payload_builder", id = %self.config.payload_id(), "spawn new payload build task");
358        let (tx, rx) = oneshot::channel();
359        let client = self.client.clone();
360        let pool = self.pool.clone();
361        let cancel = Cancelled::default();
362        let _cancel = cancel.clone();
363        let guard = self.payload_task_guard.clone();
364        let payload_config = self.config.clone();
365        let best_payload = self.best_payload.payload().cloned();
366        self.metrics.inc_initiated_payload_builds();
367        let cached_reads = self.cached_reads.take().unwrap_or_default();
368        let builder = self.builder.clone();
369        self.executor.spawn_blocking(Box::pin(async move {
370            // acquire the permit for executing the task
371            let _permit = guard.acquire().await;
372            let args = BuildArguments {
373                client,
374                pool,
375                cached_reads,
376                config: payload_config,
377                cancel,
378                best_payload,
379            };
380            let result = builder.try_build(args);
381            let _ = tx.send(result);
382        }));
383
384        self.pending_block = Some(PendingPayload { _cancel, payload: rx });
385    }
386}
387
388impl<Client, Pool, Tasks, Builder> Future for BasicPayloadJob<Client, Pool, Tasks, Builder>
389where
390    Client: StateProviderFactory + Clone + Unpin + 'static,
391    Pool: TransactionPool + Unpin + 'static,
392    Tasks: TaskSpawner + Clone + 'static,
393    Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
394    <Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
395    <Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
396{
397    type Output = Result<(), PayloadBuilderError>;
398
399    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
400        let this = self.get_mut();
401
402        // check if the deadline is reached
403        if this.deadline.as_mut().poll(cx).is_ready() {
404            trace!(target: "payload_builder", "payload building deadline reached");
405            return Poll::Ready(Ok(()))
406        }
407
408        // check if the interval is reached
409        while this.interval.poll_tick(cx).is_ready() {
410            // start a new job if there is no pending block, we haven't reached the deadline,
411            // and the payload isn't frozen
412            if this.pending_block.is_none() && !this.best_payload.is_frozen() {
413                this.spawn_build_job();
414            }
415        }
416
417        // poll the pending block
418        if let Some(mut fut) = this.pending_block.take() {
419            match fut.poll_unpin(cx) {
420                Poll::Ready(Ok(outcome)) => match outcome {
421                    BuildOutcome::Better { payload, cached_reads } => {
422                        this.cached_reads = Some(cached_reads);
423                        debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
424                        this.best_payload = PayloadState::Best(payload);
425                    }
426                    BuildOutcome::Freeze(payload) => {
427                        debug!(target: "payload_builder", "payload frozen, no further building will occur");
428                        this.best_payload = PayloadState::Frozen(payload);
429                    }
430                    BuildOutcome::Aborted { fees, cached_reads } => {
431                        this.cached_reads = Some(cached_reads);
432                        trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
433                    }
434                    BuildOutcome::Cancelled => {
435                        unreachable!("the cancel signal never fired")
436                    }
437                },
438                Poll::Ready(Err(error)) => {
439                    // job failed, but we simply try again next interval
440                    debug!(target: "payload_builder", %error, "payload build attempt failed");
441                    this.metrics.inc_failed_payload_builds();
442                }
443                Poll::Pending => {
444                    this.pending_block = Some(fut);
445                }
446            }
447        }
448
449        Poll::Pending
450    }
451}
452
453impl<Client, Pool, Tasks, Builder> PayloadJob for BasicPayloadJob<Client, Pool, Tasks, Builder>
454where
455    Client: StateProviderFactory + Clone + Unpin + 'static,
456    Pool: TransactionPool + Unpin + 'static,
457    Tasks: TaskSpawner + Clone + 'static,
458    Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
459    <Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
460    <Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
461{
462    type PayloadAttributes = Builder::Attributes;
463    type ResolvePayloadFuture = ResolveBestPayload<Self::BuiltPayload>;
464    type BuiltPayload = Builder::BuiltPayload;
465
466    fn best_payload(&self) -> Result<Self::BuiltPayload, PayloadBuilderError> {
467        if let Some(payload) = self.best_payload.payload() {
468            Ok(payload.clone())
469        } else {
470            // No payload has been built yet, but we need to return something that the CL then
471            // can deliver, so we need to return an empty payload.
472            //
473            // Note: it is assumed that this is unlikely to happen, as the payload job is
474            // started right away and the first full block should have been
475            // built by the time CL is requesting the payload.
476            self.metrics.inc_requested_empty_payload();
477            self.builder.build_empty_payload(&self.client, self.config.clone())
478        }
479    }
480
481    fn payload_attributes(&self) -> Result<Self::PayloadAttributes, PayloadBuilderError> {
482        Ok(self.config.attributes.clone())
483    }
484
485    fn resolve_kind(
486        &mut self,
487        kind: PayloadKind,
488    ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
489        let best_payload = self.best_payload.payload().cloned();
490        if best_payload.is_none() && self.pending_block.is_none() {
491            // ensure we have a job scheduled if we don't have a best payload yet and none is active
492            self.spawn_build_job();
493        }
494
495        let maybe_better = self.pending_block.take();
496        let mut empty_payload = None;
497
498        if best_payload.is_none() {
499            debug!(target: "payload_builder", id=%self.config.payload_id(), "no best payload yet to resolve, building empty payload");
500
501            let args = BuildArguments {
502                client: self.client.clone(),
503                pool: self.pool.clone(),
504                cached_reads: self.cached_reads.take().unwrap_or_default(),
505                config: self.config.clone(),
506                cancel: Cancelled::default(),
507                best_payload: None,
508            };
509
510            match self.builder.on_missing_payload(args) {
511                MissingPayloadBehaviour::AwaitInProgress => {
512                    debug!(target: "payload_builder", id=%self.config.payload_id(), "awaiting in progress payload build job");
513                }
514                MissingPayloadBehaviour::RaceEmptyPayload => {
515                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing empty payload");
516
517                    // if no payload has been built yet
518                    self.metrics.inc_requested_empty_payload();
519                    // no payload built yet, so we need to return an empty payload
520                    let (tx, rx) = oneshot::channel();
521                    let client = self.client.clone();
522                    let config = self.config.clone();
523                    let builder = self.builder.clone();
524                    self.executor.spawn_blocking(Box::pin(async move {
525                        let res = builder.build_empty_payload(&client, config);
526                        let _ = tx.send(res);
527                    }));
528
529                    empty_payload = Some(rx);
530                }
531                MissingPayloadBehaviour::RacePayload(job) => {
532                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing fallback payload");
533                    // race the in progress job with this job
534                    let (tx, rx) = oneshot::channel();
535                    self.executor.spawn_blocking(Box::pin(async move {
536                        let _ = tx.send(job());
537                    }));
538                    empty_payload = Some(rx);
539                }
540            };
541        }
542
543        let fut = ResolveBestPayload {
544            best_payload,
545            maybe_better,
546            empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
547        };
548
549        (fut, KeepPayloadJobAlive::No)
550    }
551}
552
553/// Represents the current state of a payload being built.
554#[derive(Debug, Clone)]
555pub enum PayloadState<P> {
556    /// No payload has been built yet.
557    Missing,
558    /// The best payload built so far, which may still be improved upon.
559    Best(P),
560    /// The payload is frozen and no further building should occur.
561    ///
562    /// Contains the final payload `P` that should be used.
563    Frozen(P),
564}
565
566impl<P> PayloadState<P> {
567    /// Checks if the payload is frozen.
568    pub const fn is_frozen(&self) -> bool {
569        matches!(self, Self::Frozen(_))
570    }
571
572    /// Returns the payload if it exists (either Best or Frozen).
573    pub const fn payload(&self) -> Option<&P> {
574        match self {
575            Self::Missing => None,
576            Self::Best(p) | Self::Frozen(p) => Some(p),
577        }
578    }
579}
580
581/// The future that returns the best payload to be served to the consensus layer.
582///
583/// This returns the payload that's supposed to be sent to the CL.
584///
585/// If payload has been built so far, it will return that, but it will check if there's a better
586/// payload available from an in progress build job. If so it will return that.
587///
588/// If no payload has been built so far, it will either return an empty payload or the result of the
589/// in progress build job, whatever finishes first.
590#[derive(Debug)]
591pub struct ResolveBestPayload<Payload> {
592    /// Best payload so far.
593    pub best_payload: Option<Payload>,
594    /// Regular payload job that's currently running that might produce a better payload.
595    pub maybe_better: Option<PendingPayload<Payload>>,
596    /// The empty payload building job in progress, if any.
597    pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
598}
599
600impl<Payload> ResolveBestPayload<Payload> {
601    const fn is_empty(&self) -> bool {
602        self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none()
603    }
604}
605
606impl<Payload> Future for ResolveBestPayload<Payload>
607where
608    Payload: Unpin,
609{
610    type Output = Result<Payload, PayloadBuilderError>;
611
612    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
613        let this = self.get_mut();
614
615        // check if there is a better payload before returning the best payload
616        if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() {
617            if let Poll::Ready(res) = fut.poll(cx) {
618                this.maybe_better = None;
619                if let Ok(Some(payload)) = res.map(|out| out.into_payload())
620                    .inspect_err(|err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"))
621                {
622                    debug!(target: "payload_builder", "resolving better payload");
623                    return Poll::Ready(Ok(payload))
624                }
625            }
626        }
627
628        if let Some(best) = this.best_payload.take() {
629            debug!(target: "payload_builder", "resolving best payload");
630            return Poll::Ready(Ok(best))
631        }
632
633        if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() {
634            if let Poll::Ready(res) = fut.poll(cx) {
635                this.empty_payload = None;
636                return match res {
637                    Ok(res) => {
638                        if let Err(err) = &res {
639                            warn!(target: "payload_builder", %err, "failed to resolve empty payload");
640                        } else {
641                            debug!(target: "payload_builder", "resolving empty payload");
642                        }
643                        Poll::Ready(res)
644                    }
645                    Err(err) => Poll::Ready(Err(err.into())),
646                }
647            }
648        }
649
650        if this.is_empty() {
651            return Poll::Ready(Err(PayloadBuilderError::MissingPayload))
652        }
653
654        Poll::Pending
655    }
656}
657
658/// A future that resolves to the result of the block building job.
659#[derive(Debug)]
660pub struct PendingPayload<P> {
661    /// The marker to cancel the job on drop
662    _cancel: Cancelled,
663    /// The channel to send the result to.
664    payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
665}
666
667impl<P> PendingPayload<P> {
668    /// Constructs a `PendingPayload` future.
669    pub const fn new(
670        cancel: Cancelled,
671        payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
672    ) -> Self {
673        Self { _cancel: cancel, payload }
674    }
675}
676
677impl<P> Future for PendingPayload<P> {
678    type Output = Result<BuildOutcome<P>, PayloadBuilderError>;
679
680    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
681        let res = ready!(self.payload.poll_unpin(cx));
682        Poll::Ready(res.map_err(Into::into).and_then(|res| res))
683    }
684}
685
686/// A marker that can be used to cancel a job.
687///
688/// If dropped, it will set the `cancelled` flag to true.
689#[derive(Default, Clone, Debug)]
690pub struct Cancelled(Arc<AtomicBool>);
691
692// === impl Cancelled ===
693
694impl Cancelled {
695    /// Returns true if the job was cancelled.
696    pub fn is_cancelled(&self) -> bool {
697        self.0.load(std::sync::atomic::Ordering::Relaxed)
698    }
699}
700
701impl Drop for Cancelled {
702    fn drop(&mut self) {
703        self.0.store(true, std::sync::atomic::Ordering::Relaxed);
704    }
705}
706
707/// Static config for how to build a payload.
708#[derive(Clone, Debug)]
709pub struct PayloadConfig<Attributes> {
710    /// The parent header.
711    pub parent_header: Arc<SealedHeader>,
712    /// Requested attributes for the payload.
713    pub attributes: Attributes,
714}
715
716impl<Attributes> PayloadConfig<Attributes>
717where
718    Attributes: PayloadBuilderAttributes,
719{
720    /// Create new payload config.
721    pub const fn new(parent_header: Arc<SealedHeader>, attributes: Attributes) -> Self {
722        Self { parent_header, attributes }
723    }
724
725    /// Returns the payload id.
726    pub fn payload_id(&self) -> PayloadId {
727        self.attributes.payload_id()
728    }
729}
730
731/// The possible outcomes of a payload building attempt.
732#[derive(Debug)]
733pub enum BuildOutcome<Payload> {
734    /// Successfully built a better block.
735    Better {
736        /// The new payload that was built.
737        payload: Payload,
738        /// The cached reads that were used to build the payload.
739        cached_reads: CachedReads,
740    },
741    /// Aborted payload building because resulted in worse block wrt. fees.
742    Aborted {
743        /// The total fees associated with the attempted payload.
744        fees: U256,
745        /// The cached reads that were used to build the payload.
746        cached_reads: CachedReads,
747    },
748    /// Build job was cancelled
749    Cancelled,
750
751    /// The payload is final and no further building should occur
752    Freeze(Payload),
753}
754
755impl<Payload> BuildOutcome<Payload> {
756    /// Consumes the type and returns the payload if the outcome is `Better`.
757    pub fn into_payload(self) -> Option<Payload> {
758        match self {
759            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
760            _ => None,
761        }
762    }
763
764    /// Returns true if the outcome is `Better`.
765    pub const fn is_better(&self) -> bool {
766        matches!(self, Self::Better { .. })
767    }
768
769    /// Returns true if the outcome is `Aborted`.
770    pub const fn is_aborted(&self) -> bool {
771        matches!(self, Self::Aborted { .. })
772    }
773
774    /// Returns true if the outcome is `Cancelled`.
775    pub const fn is_cancelled(&self) -> bool {
776        matches!(self, Self::Cancelled)
777    }
778
779    /// Applies a fn on the current payload.
780    pub(crate) fn map_payload<F, P>(self, f: F) -> BuildOutcome<P>
781    where
782        F: FnOnce(Payload) -> P,
783    {
784        match self {
785            Self::Better { payload, cached_reads } => {
786                BuildOutcome::Better { payload: f(payload), cached_reads }
787            }
788            Self::Aborted { fees, cached_reads } => BuildOutcome::Aborted { fees, cached_reads },
789            Self::Cancelled => BuildOutcome::Cancelled,
790            Self::Freeze(payload) => BuildOutcome::Freeze(f(payload)),
791        }
792    }
793}
794
795/// The possible outcomes of a payload building attempt without reused [`CachedReads`]
796#[derive(Debug)]
797pub enum BuildOutcomeKind<Payload> {
798    /// Successfully built a better block.
799    Better {
800        /// The new payload that was built.
801        payload: Payload,
802    },
803    /// Aborted payload building because resulted in worse block wrt. fees.
804    Aborted {
805        /// The total fees associated with the attempted payload.
806        fees: U256,
807    },
808    /// Build job was cancelled
809    Cancelled,
810    /// The payload is final and no further building should occur
811    Freeze(Payload),
812}
813
814impl<Payload> BuildOutcomeKind<Payload> {
815    /// Attaches the [`CachedReads`] to the outcome.
816    pub fn with_cached_reads(self, cached_reads: CachedReads) -> BuildOutcome<Payload> {
817        match self {
818            Self::Better { payload } => BuildOutcome::Better { payload, cached_reads },
819            Self::Aborted { fees } => BuildOutcome::Aborted { fees, cached_reads },
820            Self::Cancelled => BuildOutcome::Cancelled,
821            Self::Freeze(payload) => BuildOutcome::Freeze(payload),
822        }
823    }
824}
825
826/// A collection of arguments used for building payloads.
827///
828/// This struct encapsulates the essential components and configuration required for the payload
829/// building process. It holds references to the Ethereum client, transaction pool, cached reads,
830/// payload configuration, cancellation status, and the best payload achieved so far.
831#[derive(Debug)]
832pub struct BuildArguments<Pool, Client, Attributes, Payload> {
833    /// How to interact with the chain.
834    pub client: Client,
835    /// The transaction pool.
836    ///
837    /// Or the type that provides the transactions to build the payload.
838    pub pool: Pool,
839    /// Previously cached disk reads
840    pub cached_reads: CachedReads,
841    /// How to configure the payload.
842    pub config: PayloadConfig<Attributes>,
843    /// A marker that can be used to cancel the job.
844    pub cancel: Cancelled,
845    /// The best payload achieved so far.
846    pub best_payload: Option<Payload>,
847}
848
849impl<Pool, Client, Attributes, Payload> BuildArguments<Pool, Client, Attributes, Payload> {
850    /// Create new build arguments.
851    pub const fn new(
852        client: Client,
853        pool: Pool,
854        cached_reads: CachedReads,
855        config: PayloadConfig<Attributes>,
856        cancel: Cancelled,
857        best_payload: Option<Payload>,
858    ) -> Self {
859        Self { client, pool, cached_reads, config, cancel, best_payload }
860    }
861
862    /// Maps the transaction pool to a new type.
863    pub fn with_pool<P>(self, pool: P) -> BuildArguments<P, Client, Attributes, Payload> {
864        BuildArguments {
865            client: self.client,
866            pool,
867            cached_reads: self.cached_reads,
868            config: self.config,
869            cancel: self.cancel,
870            best_payload: self.best_payload,
871        }
872    }
873
874    /// Maps the transaction pool to a new type using a closure with the current pool type as input.
875    pub fn map_pool<F, P>(self, f: F) -> BuildArguments<P, Client, Attributes, Payload>
876    where
877        F: FnOnce(Pool) -> P,
878    {
879        BuildArguments {
880            client: self.client,
881            pool: f(self.pool),
882            cached_reads: self.cached_reads,
883            config: self.config,
884            cancel: self.cancel,
885            best_payload: self.best_payload,
886        }
887    }
888}
889
890/// A trait for building payloads that encapsulate Ethereum transactions.
891///
892/// This trait provides the `try_build` method to construct a transaction payload
893/// using `BuildArguments`. It returns a `Result` indicating success or a
894/// `PayloadBuilderError` if building fails.
895///
896/// Generic parameters `Pool` and `Client` represent the transaction pool and
897/// Ethereum client types.
898pub trait PayloadBuilder<Pool, Client>: Send + Sync + Clone {
899    /// The payload attributes type to accept for building.
900    type Attributes: PayloadBuilderAttributes;
901    /// The type of the built payload.
902    type BuiltPayload: BuiltPayload;
903
904    /// Tries to build a transaction payload using provided arguments.
905    ///
906    /// Constructs a transaction payload based on the given arguments,
907    /// returning a `Result` indicating success or an error if building fails.
908    ///
909    /// # Arguments
910    ///
911    /// - `args`: Build arguments containing necessary components.
912    ///
913    /// # Returns
914    ///
915    /// A `Result` indicating the build outcome or an error.
916    fn try_build(
917        &self,
918        args: BuildArguments<Pool, Client, Self::Attributes, Self::BuiltPayload>,
919    ) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError>;
920
921    /// Invoked when the payload job is being resolved and there is no payload yet.
922    ///
923    /// This can happen if the CL requests a payload before the first payload has been built.
924    fn on_missing_payload(
925        &self,
926        _args: BuildArguments<Pool, Client, Self::Attributes, Self::BuiltPayload>,
927    ) -> MissingPayloadBehaviour<Self::BuiltPayload> {
928        MissingPayloadBehaviour::RaceEmptyPayload
929    }
930
931    /// Builds an empty payload without any transaction.
932    fn build_empty_payload(
933        &self,
934        client: &Client,
935        config: PayloadConfig<Self::Attributes>,
936    ) -> Result<Self::BuiltPayload, PayloadBuilderError>;
937}
938
939/// Tells the payload builder how to react to payload request if there's no payload available yet.
940///
941/// This situation can occur if the CL requests a payload before the first payload has been built.
942pub enum MissingPayloadBehaviour<Payload> {
943    /// Await the regular scheduled payload process.
944    AwaitInProgress,
945    /// Race the in progress payload process with an empty payload.
946    RaceEmptyPayload,
947    /// Race the in progress payload process with this job.
948    RacePayload(Box<dyn FnOnce() -> Result<Payload, PayloadBuilderError> + Send>),
949}
950
951impl<Payload> fmt::Debug for MissingPayloadBehaviour<Payload> {
952    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
953        match self {
954            Self::AwaitInProgress => write!(f, "AwaitInProgress"),
955            Self::RaceEmptyPayload => {
956                write!(f, "RaceEmptyPayload")
957            }
958            Self::RacePayload(_) => write!(f, "RacePayload"),
959        }
960    }
961}
962
963impl<Payload> Default for MissingPayloadBehaviour<Payload> {
964    fn default() -> Self {
965        Self::RaceEmptyPayload
966    }
967}
968
969/// Executes the withdrawals and commits them to the _runtime_ Database and `BundleState`.
970///
971/// Returns the withdrawals root.
972///
973/// Returns `None` values pre shanghai
974pub fn commit_withdrawals<DB, ChainSpec>(
975    db: &mut State<DB>,
976    chain_spec: &ChainSpec,
977    timestamp: u64,
978    withdrawals: &Withdrawals,
979) -> Result<Option<B256>, DB::Error>
980where
981    DB: Database,
982    ChainSpec: EthereumHardforks,
983{
984    if !chain_spec.is_shanghai_active_at_timestamp(timestamp) {
985        return Ok(None)
986    }
987
988    if withdrawals.is_empty() {
989        return Ok(Some(EMPTY_WITHDRAWALS))
990    }
991
992    let balance_increments =
993        post_block_withdrawals_balance_increments(chain_spec, timestamp, withdrawals);
994
995    db.increment_balances(balance_increments)?;
996
997    Ok(Some(proofs::calculate_withdrawals_root(withdrawals)))
998}
999
1000/// Checks if the new payload is better than the current best.
1001///
1002/// This compares the total fees of the blocks, higher is better.
1003#[inline(always)]
1004pub fn is_better_payload<T: BuiltPayload>(best_payload: Option<&T>, new_fees: U256) -> bool {
1005    if let Some(best_payload) = best_payload {
1006        new_fees > best_payload.fees()
1007    } else {
1008        true
1009    }
1010}
1011
1012/// Returns the duration until the given unix timestamp in seconds.
1013///
1014/// Returns `Duration::ZERO` if the given timestamp is in the past.
1015fn duration_until(unix_timestamp_secs: u64) -> Duration {
1016    let unix_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
1017    let timestamp = Duration::from_secs(unix_timestamp_secs);
1018    timestamp.saturating_sub(unix_now)
1019}