reth_rpc_eth_api/helpers/
pending_block.rs1use super::SpawnBlocking;
5use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
6use alloy_consensus::{BlockHeader, Transaction};
7use alloy_eips::eip4844::MAX_DATA_GAS_PER_BLOCK;
8use alloy_network::Network;
9use alloy_primitives::B256;
10use alloy_rpc_types_eth::BlockNumberOrTag;
11use futures::Future;
12use reth_chainspec::{EthChainSpec, EthereumHardforks};
13use reth_errors::RethError;
14use reth_evm::{
15 state_change::post_block_withdrawals_balance_increments, system_calls::SystemCaller,
16 ConfigureEvm, ConfigureEvmEnv, NextBlockEnvAttributes,
17};
18use reth_primitives::{BlockExt, InvalidTransactionError, SealedBlockWithSenders};
19use reth_primitives_traits::Receipt;
20use reth_provider::{
21 BlockReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, ProviderBlock, ProviderError,
22 ProviderHeader, ProviderReceipt, ProviderTx, ReceiptProvider, StateProviderFactory,
23};
24use reth_revm::{
25 database::StateProviderDatabase,
26 primitives::{
27 BlockEnv, CfgEnvWithHandlerCfg, EVMError, Env, ExecutionResult, InvalidTransaction,
28 ResultAndState,
29 },
30};
31use reth_rpc_eth_types::{EthApiError, PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin};
32use reth_transaction_pool::{
33 error::InvalidPoolTransactionError, BestTransactionsAttributes, PoolTransaction,
34 TransactionPool,
35};
36use revm::{db::states::bundle_state::BundleRetention, DatabaseCommit, State};
37use std::time::{Duration, Instant};
38use tokio::sync::Mutex;
39use tracing::debug;
40
41pub trait LoadPendingBlock:
45 EthApiTypes<
46 NetworkTypes: Network<
47 HeaderResponse = alloy_rpc_types_eth::Header<ProviderHeader<Self::Provider>>,
48 >,
49 > + RpcNodeCore<
50 Provider: BlockReaderIdExt<Receipt: Receipt>
51 + EvmEnvProvider<ProviderHeader<Self::Provider>>
52 + ChainSpecProvider<ChainSpec: EthChainSpec + EthereumHardforks>
53 + StateProviderFactory,
54 Pool: TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
55 Evm: ConfigureEvm<
56 Header = ProviderHeader<Self::Provider>,
57 Transaction = ProviderTx<Self::Provider>,
58 >,
59 >
60{
61 #[expect(clippy::type_complexity)]
65 fn pending_block(
66 &self,
67 ) -> &Mutex<Option<PendingBlock<ProviderBlock<Self::Provider>, ProviderReceipt<Self::Provider>>>>;
68
69 #[expect(clippy::type_complexity)]
73 fn pending_block_env_and_cfg(
74 &self,
75 ) -> Result<
76 PendingBlockEnv<ProviderBlock<Self::Provider>, ProviderReceipt<Self::Provider>>,
77 Self::Error,
78 > {
79 if let Some(block) =
80 self.provider().pending_block_with_senders().map_err(Self::Error::from_eth_err)?
81 {
82 if let Some(receipts) = self
83 .provider()
84 .receipts_by_block(block.hash().into())
85 .map_err(Self::Error::from_eth_err)?
86 {
87 let (cfg, block_env) = self
91 .provider()
92 .env_with_header(block.header(), self.evm_config().clone())
93 .map_err(Self::Error::from_eth_err)?;
94
95 return Ok(PendingBlockEnv::new(
96 cfg,
97 block_env,
98 PendingBlockEnvOrigin::ActualPending(block, receipts),
99 ));
100 }
101 }
102
103 let latest = self
106 .provider()
107 .latest_header()
108 .map_err(Self::Error::from_eth_err)?
109 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
110
111 let (cfg, block_env) = self
112 .evm_config()
113 .next_cfg_and_block_env(
114 &latest,
115 NextBlockEnvAttributes {
116 timestamp: latest.timestamp() + 12,
117 suggested_fee_recipient: latest.beneficiary(),
118 prev_randao: B256::random(),
119 },
120 )
121 .map_err(RethError::other)
122 .map_err(Self::Error::from_eth_err)?;
123
124 Ok(PendingBlockEnv::new(
125 cfg,
126 block_env,
127 PendingBlockEnvOrigin::DerivedFromLatest(latest.hash()),
128 ))
129 }
130
131 #[expect(clippy::type_complexity)]
133 fn local_pending_block(
134 &self,
135 ) -> impl Future<
136 Output = Result<
137 Option<(
138 SealedBlockWithSenders<<Self::Provider as BlockReader>::Block>,
139 Vec<ProviderReceipt<Self::Provider>>,
140 )>,
141 Self::Error,
142 >,
143 > + Send
144 where
145 Self: SpawnBlocking,
146 {
147 async move {
148 let pending = self.pending_block_env_and_cfg()?;
149 let parent_hash = match pending.origin {
150 PendingBlockEnvOrigin::ActualPending(block, receipts) => {
151 return Ok(Some((block, receipts)));
152 }
153 PendingBlockEnvOrigin::DerivedFromLatest(parent_hash) => parent_hash,
154 };
155
156 let mut lock = self.pending_block().lock().await;
158
159 let now = Instant::now();
160
161 if let Some(pending_block) = lock.as_ref() {
163 if pending.block_env.number.to::<u64>() == pending_block.block.number() &&
165 parent_hash == pending_block.block.parent_hash() &&
166 now <= pending_block.expires_at
167 {
168 return Ok(Some((pending_block.block.clone(), pending_block.receipts.clone())));
169 }
170 }
171
172 let (sealed_block, receipts) = match self
174 .spawn_blocking_io(move |this| {
175 this.build_block(pending.cfg, pending.block_env, parent_hash)
177 })
178 .await
179 {
180 Ok(block) => block,
181 Err(err) => {
182 debug!(target: "rpc", "Failed to build pending block: {:?}", err);
183 return Ok(None)
184 }
185 };
186
187 let now = Instant::now();
188 *lock = Some(PendingBlock::new(
189 now + Duration::from_secs(1),
190 sealed_block.clone(),
191 receipts.clone(),
192 ));
193
194 Ok(Some((sealed_block, receipts)))
195 }
196 }
197
198 fn assemble_receipt(
200 &self,
201 tx: &ProviderTx<Self::Provider>,
202 result: ExecutionResult,
203 cumulative_gas_used: u64,
204 ) -> ProviderReceipt<Self::Provider>;
205
206 fn assemble_block(
208 &self,
209 block_env: &BlockEnv,
210 parent_hash: revm_primitives::B256,
211 state_root: revm_primitives::B256,
212 transactions: Vec<ProviderTx<Self::Provider>>,
213 receipts: &[ProviderReceipt<Self::Provider>],
214 ) -> ProviderBlock<Self::Provider>;
215
216 fn assemble_block_and_receipts(
218 &self,
219 block_env: &BlockEnv,
220 parent_hash: revm_primitives::B256,
221 state_root: revm_primitives::B256,
222 transactions: Vec<ProviderTx<Self::Provider>>,
223 results: Vec<ExecutionResult>,
224 ) -> (ProviderBlock<Self::Provider>, Vec<ProviderReceipt<Self::Provider>>) {
225 let mut cumulative_gas_used = 0;
226 let mut receipts = Vec::with_capacity(results.len());
227
228 for (tx, outcome) in transactions.iter().zip(results) {
229 cumulative_gas_used += outcome.gas_used();
230 receipts.push(self.assemble_receipt(tx, outcome, cumulative_gas_used));
231 }
232
233 let block =
234 self.assemble_block(block_env, parent_hash, state_root, transactions, &receipts);
235
236 (block, receipts)
237 }
238
239 #[expect(clippy::type_complexity)]
246 fn build_block(
247 &self,
248 cfg: CfgEnvWithHandlerCfg,
249 block_env: BlockEnv,
250 parent_hash: B256,
251 ) -> Result<
252 (
253 SealedBlockWithSenders<ProviderBlock<Self::Provider>>,
254 Vec<ProviderReceipt<Self::Provider>>,
255 ),
256 Self::Error,
257 >
258 where
259 EthApiError: From<ProviderError>,
260 {
261 let state_provider = self
262 .provider()
263 .history_by_block_hash(parent_hash)
264 .map_err(Self::Error::from_eth_err)?;
265 let state = StateProviderDatabase::new(state_provider);
266 let mut db = State::builder().with_database(state).with_bundle_update().build();
267
268 let mut cumulative_gas_used = 0;
269 let mut sum_blob_gas_used = 0;
270 let block_gas_limit: u64 = block_env.gas_limit.to::<u64>();
271 let base_fee = block_env.basefee.to::<u64>();
272
273 let mut executed_txs = Vec::new();
274 let mut senders = Vec::new();
275 let mut best_txs =
276 self.pool().best_transactions_with_attributes(BestTransactionsAttributes::new(
277 base_fee,
278 block_env.get_blob_gasprice().map(|gasprice| gasprice as u64),
279 ));
280
281 let chain_spec = self.provider().chain_spec();
282
283 let mut system_caller = SystemCaller::new(self.evm_config().clone(), chain_spec.clone());
284
285 system_caller
286 .pre_block_blockhashes_contract_call(&mut db, &cfg, &block_env, parent_hash)
287 .map_err(|err| EthApiError::Internal(err.into()))?;
288
289 let mut results = Vec::new();
290
291 while let Some(pool_tx) = best_txs.next() {
292 if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
294 best_txs.mark_invalid(
298 &pool_tx,
299 InvalidPoolTransactionError::ExceedsGasLimit(
300 pool_tx.gas_limit(),
301 block_gas_limit,
302 ),
303 );
304 continue
305 }
306
307 if pool_tx.origin.is_private() {
308 best_txs.mark_invalid(
312 &pool_tx,
313 InvalidPoolTransactionError::Consensus(
314 InvalidTransactionError::TxTypeNotSupported,
315 ),
316 );
317 continue
318 }
319
320 let tx = pool_tx.to_consensus();
322
323 if let Some(tx_blob_gas) = tx.blob_gas_used() {
326 if sum_blob_gas_used + tx_blob_gas > MAX_DATA_GAS_PER_BLOCK {
327 best_txs.mark_invalid(
332 &pool_tx,
333 InvalidPoolTransactionError::ExceedsGasLimit(
334 tx_blob_gas,
335 MAX_DATA_GAS_PER_BLOCK,
336 ),
337 );
338 continue
339 }
340 }
341
342 let env = Env::boxed(
344 cfg.cfg_env.clone(),
345 block_env.clone(),
346 Self::evm_config(self)
347 .tx_env(tx.as_signed(), tx.signer())
348 .map_err(|_| EthApiError::FailedToDecodeSignedTransaction)?,
349 );
350
351 let mut evm = revm::Evm::builder().with_env(env).with_db(&mut db).build();
352
353 let ResultAndState { result, state } = match evm.transact() {
354 Ok(res) => res,
355 Err(err) => {
356 match err {
357 EVMError::Transaction(err) => {
358 if matches!(err, InvalidTransaction::NonceTooLow { .. }) {
359 } else {
361 best_txs.mark_invalid(
364 &pool_tx,
365 InvalidPoolTransactionError::Consensus(
366 InvalidTransactionError::TxTypeNotSupported,
367 ),
368 );
369 }
370 continue
371 }
372 err => {
373 return Err(Self::Error::from_evm_err(err))
375 }
376 }
377 }
378 };
379 drop(evm);
381 db.commit(state);
383
384 if let Some(tx_blob_gas) = tx.blob_gas_used() {
386 sum_blob_gas_used += tx_blob_gas;
387
388 if sum_blob_gas_used == MAX_DATA_GAS_PER_BLOCK {
390 best_txs.skip_blobs();
391 }
392 }
393
394 let gas_used = result.gas_used();
395
396 cumulative_gas_used += gas_used;
398
399 let (tx, sender) = tx.to_components();
401 executed_txs.push(tx);
402 senders.push(sender);
403 results.push(result);
404 }
405
406 let balance_increments = post_block_withdrawals_balance_increments(
408 chain_spec.as_ref(),
409 block_env.timestamp.try_into().unwrap_or(u64::MAX),
410 &[],
411 );
412
413 db.increment_balances(balance_increments).map_err(Self::Error::from_eth_err)?;
415
416 db.merge_transitions(BundleRetention::PlainState);
418
419 let bundle_state = db.take_bundle();
420 let hashed_state = db.database.hashed_post_state(&bundle_state);
421
422 let state_root = db.database.state_root(hashed_state).map_err(Self::Error::from_eth_err)?;
424
425 let (block, receipts) = self.assemble_block_and_receipts(
426 &block_env,
427 parent_hash,
428 state_root,
429 executed_txs,
430 results,
431 );
432
433 Ok((SealedBlockWithSenders { block: block.seal_slow(), senders }, receipts))
434 }
435}