1use alloy_consensus::{BlockHeader, Transaction as _};
4use alloy_primitives::{Keccak256, U256};
5use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
6use jsonrpsee::core::RpcResult;
7use reth_chainspec::EthChainSpec;
8use reth_evm::{ConfigureEvm, ConfigureEvmEnv};
9use reth_primitives::PooledTransactionsElement;
10use reth_primitives_traits::SignedTransaction;
11use reth_provider::{ChainSpecProvider, HeaderProvider};
12use reth_revm::database::StateProviderDatabase;
13use reth_rpc_eth_api::{
14 helpers::{Call, EthTransactions, LoadPendingBlock},
15 EthCallBundleApiServer, FromEthApiError, FromEvmError, RpcNodeCore,
16};
17use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
18use reth_tasks::pool::BlockingTaskGuard;
19use reth_transaction_pool::{PoolConsensusTx, PoolPooledTx, PoolTransaction, TransactionPool};
20use revm::{
21 db::{CacheDB, DatabaseCommit, DatabaseRef},
22 primitives::{ResultAndState, TxEnv},
23};
24use revm_primitives::{EnvKzgSettings, EnvWithHandlerCfg, SpecId, MAX_BLOB_GAS_PER_BLOCK};
25use std::sync::Arc;
26
27pub struct EthBundle<Eth> {
29 inner: Arc<EthBundleInner<Eth>>,
31}
32
33impl<Eth> EthBundle<Eth> {
34 pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
36 Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
37 }
38
39 pub fn eth_api(&self) -> &Eth {
41 &self.inner.eth_api
42 }
43}
44
45impl<Eth> EthBundle<Eth>
46where
47 Eth: EthTransactions<
48 Pool: TransactionPool<
49 Transaction: PoolTransaction<
50 Consensus: From<PooledTransactionsElement>,
51 Pooled = PooledTransactionsElement,
52 >,
53 >,
54 > + LoadPendingBlock
55 + Call
56 + 'static,
57{
58 pub async fn call_bundle(
63 &self,
64 bundle: EthCallBundle,
65 ) -> Result<EthCallBundleResponse, Eth::Error> {
66 let EthCallBundle {
67 txs,
68 block_number,
69 coinbase,
70 state_block_number,
71 timeout: _,
72 timestamp,
73 gas_limit,
74 difficulty,
75 base_fee,
76 ..
77 } = bundle;
78 if txs.is_empty() {
79 return Err(EthApiError::InvalidParams(
80 EthBundleError::EmptyBundleTransactions.to_string(),
81 )
82 .into())
83 }
84 if block_number == 0 {
85 return Err(EthApiError::InvalidParams(
86 EthBundleError::BundleMissingBlockNumber.to_string(),
87 )
88 .into())
89 }
90
91 let transactions = txs
92 .into_iter()
93 .map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
94 .collect::<Result<Vec<_>, _>>()?
95 .into_iter()
96 .map(|tx| tx.to_components())
97 .collect::<Vec<_>>();
98
99 if transactions
102 .iter()
103 .filter_map(|(tx, _)| {
104 if let PooledTransactionsElement::BlobTransaction(tx) = tx {
105 Some(tx.tx().tx().blob_gas())
106 } else {
107 None
108 }
109 })
110 .sum::<u64>() >
111 MAX_BLOB_GAS_PER_BLOCK
112 {
113 return Err(EthApiError::InvalidParams(
114 EthBundleError::Eip4844BlobGasExceeded.to_string(),
115 )
116 .into())
117 }
118
119 let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
120 let (cfg, mut block_env, at) = self.eth_api().evm_env_at(block_id).await?;
122
123 if let Some(coinbase) = coinbase {
124 block_env.coinbase = coinbase;
125 }
126
127 if let Some(timestamp) = timestamp {
129 block_env.timestamp = U256::from(timestamp);
130 } else {
131 block_env.timestamp += U256::from(12);
132 }
133
134 if let Some(difficulty) = difficulty {
135 block_env.difficulty = U256::from(difficulty);
136 }
137
138 block_env.gas_limit = U256::from(self.inner.eth_api.call_gas_limit());
140 if let Some(gas_limit) = gas_limit {
141 let gas_limit = U256::from(gas_limit);
142 if gas_limit > block_env.gas_limit {
143 return Err(
144 EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
145 )
146 }
147 block_env.gas_limit = gas_limit;
148 }
149
150 if let Some(base_fee) = base_fee {
151 block_env.basefee = U256::from(base_fee);
152 } else if cfg.handler_cfg.spec_id.is_enabled_in(SpecId::LONDON) {
153 let parent_block = block_env.number.saturating_to::<u64>();
154 let parent = RpcNodeCore::provider(self.eth_api())
156 .header_by_number(parent_block)
157 .map_err(Eth::Error::from_eth_err)?
158 .ok_or(EthApiError::HeaderNotFound(parent_block.into()))?;
159 if let Some(base_fee) = parent.next_block_base_fee(
160 RpcNodeCore::provider(self.eth_api())
161 .chain_spec()
162 .base_fee_params_at_block(parent_block),
163 ) {
164 block_env.basefee = U256::from(base_fee);
165 }
166 }
167
168 let state_block_number = block_env.number;
169 block_env.number = U256::from(block_number);
171
172 let eth_api = self.eth_api().clone();
173
174 self.eth_api()
175 .spawn_with_state_at_block(at, move |state| {
176 let coinbase = block_env.coinbase;
177 let basefee = Some(block_env.basefee.to::<u64>());
178 let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, TxEnv::default());
179 let db = CacheDB::new(StateProviderDatabase::new(state));
180
181 let initial_coinbase = db
182 .basic_ref(coinbase)
183 .map_err(Eth::Error::from_eth_err)?
184 .map(|acc| acc.balance)
185 .unwrap_or_default();
186 let mut coinbase_balance_before_tx = initial_coinbase;
187 let mut coinbase_balance_after_tx = initial_coinbase;
188 let mut total_gas_used = 0u64;
189 let mut total_gas_fess = U256::ZERO;
190 let mut hasher = Keccak256::new();
191
192 let mut evm = eth_api.evm_config().evm_with_env(db, env);
193
194 let mut results = Vec::with_capacity(transactions.len());
195 let mut transactions = transactions.into_iter().peekable();
196
197 while let Some((tx, signer)) = transactions.next() {
198 if let PooledTransactionsElement::BlobTransaction(ref tx) = tx {
201 tx.tx().validate_blob(EnvKzgSettings::Default.get()).map_err(|e| {
202 Eth::Error::from_eth_err(EthApiError::InvalidParams(e.to_string()))
203 })?;
204 }
205
206 let tx: PoolConsensusTx<Eth::Pool> = tx.into();
207
208 hasher.update(*tx.tx_hash());
209 let gas_price = tx.effective_gas_price(basefee);
210 eth_api.evm_config().fill_tx_env(evm.tx_mut(), &tx, signer).map_err(|_| {
211 Eth::Error::from_eth_err(EthApiError::FailedToDecodeSignedTransaction)
212 })?;
213 let ResultAndState { result, state } =
214 evm.transact().map_err(Eth::Error::from_evm_err)?;
215
216 let gas_used = result.gas_used();
217 total_gas_used += gas_used;
218
219 let gas_fees = U256::from(gas_used) * U256::from(gas_price);
220 total_gas_fess += gas_fees;
221
222 coinbase_balance_after_tx =
224 state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
225 let coinbase_diff =
226 coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
227 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
228
229 coinbase_balance_before_tx = coinbase_balance_after_tx;
231
232 let (value, revert) = if result.is_success() {
234 let value = result.into_output().unwrap_or_default();
235 (Some(value), None)
236 } else {
237 let revert = result.into_output().unwrap_or_default();
238 (None, Some(revert))
239 };
240
241 let tx_res = EthCallBundleTransactionResult {
242 coinbase_diff,
243 eth_sent_to_coinbase,
244 from_address: signer,
245 gas_fees,
246 gas_price: U256::from(gas_price),
247 gas_used,
248 to_address: tx.to(),
249 tx_hash: *tx.tx_hash(),
250 value,
251 revert,
252 };
253 results.push(tx_res);
254
255 if transactions.peek().is_some() {
258 evm.context.evm.db.commit(state)
261 }
262 }
263
264 let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
267 let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fess);
268 let bundle_gas_price =
269 coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
270 let res = EthCallBundleResponse {
271 bundle_gas_price,
272 bundle_hash: hasher.finalize(),
273 coinbase_diff,
274 eth_sent_to_coinbase,
275 gas_fees: total_gas_fess,
276 results,
277 state_block_number: state_block_number.to(),
278 total_gas_used,
279 };
280
281 Ok(res)
282 })
283 .await
284 }
285}
286
287#[async_trait::async_trait]
288impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
289where
290 Eth: EthTransactions<
291 Pool: TransactionPool<Transaction: PoolTransaction<Pooled = PooledTransactionsElement>>,
292 > + LoadPendingBlock
293 + Call
294 + 'static,
295{
296 async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
297 self.call_bundle(request).await.map_err(Into::into)
298 }
299}
300
301#[derive(Debug)]
303struct EthBundleInner<Eth> {
304 eth_api: Eth,
306 #[allow(dead_code)]
308 blocking_task_guard: BlockingTaskGuard,
309}
310
311impl<Eth> std::fmt::Debug for EthBundle<Eth> {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 f.debug_struct("EthBundle").finish_non_exhaustive()
314 }
315}
316
317impl<Eth> Clone for EthBundle<Eth> {
318 fn clone(&self) -> Self {
319 Self { inner: Arc::clone(&self.inner) }
320 }
321}
322
323#[derive(Debug, thiserror::Error)]
325pub enum EthBundleError {
326 #[error("bundle missing txs")]
328 EmptyBundleTransactions,
329 #[error("bundle missing blockNumber")]
331 BundleMissingBlockNumber,
332 #[error("blob gas usage exceeds the limit of {MAX_BLOB_GAS_PER_BLOCK} gas per block.")]
335 Eip4844BlobGasExceeded,
336}