reth_rpc/eth/
bundle.rs

1//! `Eth` bundle implementation and helpers.
2
3use alloy_consensus::{BlockHeader, Transaction as _};
4use alloy_primitives::{Keccak256, U256};
5use alloy_rpc_types_mev::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult};
6use jsonrpsee::core::RpcResult;
7use reth_chainspec::EthChainSpec;
8use reth_evm::{ConfigureEvm, ConfigureEvmEnv};
9use reth_primitives::PooledTransactionsElement;
10use reth_primitives_traits::SignedTransaction;
11use reth_provider::{ChainSpecProvider, HeaderProvider};
12use reth_revm::database::StateProviderDatabase;
13use reth_rpc_eth_api::{
14    helpers::{Call, EthTransactions, LoadPendingBlock},
15    EthCallBundleApiServer, FromEthApiError, FromEvmError, RpcNodeCore,
16};
17use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError, RpcInvalidTransactionError};
18use reth_tasks::pool::BlockingTaskGuard;
19use reth_transaction_pool::{PoolConsensusTx, PoolPooledTx, PoolTransaction, TransactionPool};
20use revm::{
21    db::{CacheDB, DatabaseCommit, DatabaseRef},
22    primitives::{ResultAndState, TxEnv},
23};
24use revm_primitives::{EnvKzgSettings, EnvWithHandlerCfg, SpecId, MAX_BLOB_GAS_PER_BLOCK};
25use std::sync::Arc;
26
27/// `Eth` bundle implementation.
28pub struct EthBundle<Eth> {
29    /// All nested fields bundled together.
30    inner: Arc<EthBundleInner<Eth>>,
31}
32
33impl<Eth> EthBundle<Eth> {
34    /// Create a new `EthBundle` instance.
35    pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
36        Self { inner: Arc::new(EthBundleInner { eth_api, blocking_task_guard }) }
37    }
38
39    /// Access the underlying `Eth` API.
40    pub fn eth_api(&self) -> &Eth {
41        &self.inner.eth_api
42    }
43}
44
45impl<Eth> EthBundle<Eth>
46where
47    Eth: EthTransactions<
48            Pool: TransactionPool<
49                Transaction: PoolTransaction<
50                    Consensus: From<PooledTransactionsElement>,
51                    Pooled = PooledTransactionsElement,
52                >,
53            >,
54        > + LoadPendingBlock
55        + Call
56        + 'static,
57{
58    /// Simulates a bundle of transactions at the top of a given block number with the state of
59    /// another (or the same) block. This can be used to simulate future blocks with the current
60    /// state, or it can be used to simulate a past block. The sender is responsible for signing the
61    /// transactions and using the correct nonce and ensuring validity
62    pub async fn call_bundle(
63        &self,
64        bundle: EthCallBundle,
65    ) -> Result<EthCallBundleResponse, Eth::Error> {
66        let EthCallBundle {
67            txs,
68            block_number,
69            coinbase,
70            state_block_number,
71            timeout: _,
72            timestamp,
73            gas_limit,
74            difficulty,
75            base_fee,
76            ..
77        } = bundle;
78        if txs.is_empty() {
79            return Err(EthApiError::InvalidParams(
80                EthBundleError::EmptyBundleTransactions.to_string(),
81            )
82            .into())
83        }
84        if block_number == 0 {
85            return Err(EthApiError::InvalidParams(
86                EthBundleError::BundleMissingBlockNumber.to_string(),
87            )
88            .into())
89        }
90
91        let transactions = txs
92            .into_iter()
93            .map(|tx| recover_raw_transaction::<PoolPooledTx<Eth::Pool>>(&tx))
94            .collect::<Result<Vec<_>, _>>()?
95            .into_iter()
96            .map(|tx| tx.to_components())
97            .collect::<Vec<_>>();
98
99        // Validate that the bundle does not contain more than MAX_BLOB_NUMBER_PER_BLOCK blob
100        // transactions.
101        if transactions
102            .iter()
103            .filter_map(|(tx, _)| {
104                if let PooledTransactionsElement::BlobTransaction(tx) = tx {
105                    Some(tx.tx().tx().blob_gas())
106                } else {
107                    None
108                }
109            })
110            .sum::<u64>() >
111            MAX_BLOB_GAS_PER_BLOCK
112        {
113            return Err(EthApiError::InvalidParams(
114                EthBundleError::Eip4844BlobGasExceeded.to_string(),
115            )
116            .into())
117        }
118
119        let block_id: alloy_rpc_types_eth::BlockId = state_block_number.into();
120        // Note: the block number is considered the `parent` block: <https://github.com/flashbots/mev-geth/blob/fddf97beec5877483f879a77b7dea2e58a58d653/internal/ethapi/api.go#L2104>
121        let (cfg, mut block_env, at) = self.eth_api().evm_env_at(block_id).await?;
122
123        if let Some(coinbase) = coinbase {
124            block_env.coinbase = coinbase;
125        }
126
127        // need to adjust the timestamp for the next block
128        if let Some(timestamp) = timestamp {
129            block_env.timestamp = U256::from(timestamp);
130        } else {
131            block_env.timestamp += U256::from(12);
132        }
133
134        if let Some(difficulty) = difficulty {
135            block_env.difficulty = U256::from(difficulty);
136        }
137
138        // default to call gas limit unless user requests a smaller limit
139        block_env.gas_limit = U256::from(self.inner.eth_api.call_gas_limit());
140        if let Some(gas_limit) = gas_limit {
141            let gas_limit = U256::from(gas_limit);
142            if gas_limit > block_env.gas_limit {
143                return Err(
144                    EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh).into()
145                )
146            }
147            block_env.gas_limit = gas_limit;
148        }
149
150        if let Some(base_fee) = base_fee {
151            block_env.basefee = U256::from(base_fee);
152        } else if cfg.handler_cfg.spec_id.is_enabled_in(SpecId::LONDON) {
153            let parent_block = block_env.number.saturating_to::<u64>();
154            // here we need to fetch the _next_ block's basefee based on the parent block <https://github.com/flashbots/mev-geth/blob/fddf97beec5877483f879a77b7dea2e58a58d653/internal/ethapi/api.go#L2130>
155            let parent = RpcNodeCore::provider(self.eth_api())
156                .header_by_number(parent_block)
157                .map_err(Eth::Error::from_eth_err)?
158                .ok_or(EthApiError::HeaderNotFound(parent_block.into()))?;
159            if let Some(base_fee) = parent.next_block_base_fee(
160                RpcNodeCore::provider(self.eth_api())
161                    .chain_spec()
162                    .base_fee_params_at_block(parent_block),
163            ) {
164                block_env.basefee = U256::from(base_fee);
165            }
166        }
167
168        let state_block_number = block_env.number;
169        // use the block number of the request
170        block_env.number = U256::from(block_number);
171
172        let eth_api = self.eth_api().clone();
173
174        self.eth_api()
175            .spawn_with_state_at_block(at, move |state| {
176                let coinbase = block_env.coinbase;
177                let basefee = Some(block_env.basefee.to::<u64>());
178                let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, TxEnv::default());
179                let db = CacheDB::new(StateProviderDatabase::new(state));
180
181                let initial_coinbase = db
182                    .basic_ref(coinbase)
183                    .map_err(Eth::Error::from_eth_err)?
184                    .map(|acc| acc.balance)
185                    .unwrap_or_default();
186                let mut coinbase_balance_before_tx = initial_coinbase;
187                let mut coinbase_balance_after_tx = initial_coinbase;
188                let mut total_gas_used = 0u64;
189                let mut total_gas_fess = U256::ZERO;
190                let mut hasher = Keccak256::new();
191
192                let mut evm = eth_api.evm_config().evm_with_env(db, env);
193
194                let mut results = Vec::with_capacity(transactions.len());
195                let mut transactions = transactions.into_iter().peekable();
196
197                while let Some((tx, signer)) = transactions.next() {
198                    // Verify that the given blob data, commitments, and proofs are all valid for
199                    // this transaction.
200                    if let PooledTransactionsElement::BlobTransaction(ref tx) = tx {
201                        tx.tx().validate_blob(EnvKzgSettings::Default.get()).map_err(|e| {
202                            Eth::Error::from_eth_err(EthApiError::InvalidParams(e.to_string()))
203                        })?;
204                    }
205
206                    let tx: PoolConsensusTx<Eth::Pool> = tx.into();
207
208                    hasher.update(*tx.tx_hash());
209                    let gas_price = tx.effective_gas_price(basefee);
210                    eth_api.evm_config().fill_tx_env(evm.tx_mut(), &tx, signer).map_err(|_| {
211                        Eth::Error::from_eth_err(EthApiError::FailedToDecodeSignedTransaction)
212                    })?;
213                    let ResultAndState { result, state } =
214                        evm.transact().map_err(Eth::Error::from_evm_err)?;
215
216                    let gas_used = result.gas_used();
217                    total_gas_used += gas_used;
218
219                    let gas_fees = U256::from(gas_used) * U256::from(gas_price);
220                    total_gas_fess += gas_fees;
221
222                    // coinbase is always present in the result state
223                    coinbase_balance_after_tx =
224                        state.get(&coinbase).map(|acc| acc.info.balance).unwrap_or_default();
225                    let coinbase_diff =
226                        coinbase_balance_after_tx.saturating_sub(coinbase_balance_before_tx);
227                    let eth_sent_to_coinbase = coinbase_diff.saturating_sub(gas_fees);
228
229                    // update the coinbase balance
230                    coinbase_balance_before_tx = coinbase_balance_after_tx;
231
232                    // set the return data for the response
233                    let (value, revert) = if result.is_success() {
234                        let value = result.into_output().unwrap_or_default();
235                        (Some(value), None)
236                    } else {
237                        let revert = result.into_output().unwrap_or_default();
238                        (None, Some(revert))
239                    };
240
241                    let tx_res = EthCallBundleTransactionResult {
242                        coinbase_diff,
243                        eth_sent_to_coinbase,
244                        from_address: signer,
245                        gas_fees,
246                        gas_price: U256::from(gas_price),
247                        gas_used,
248                        to_address: tx.to(),
249                        tx_hash: *tx.tx_hash(),
250                        value,
251                        revert,
252                    };
253                    results.push(tx_res);
254
255                    // need to apply the state changes of this call before executing the
256                    // next call
257                    if transactions.peek().is_some() {
258                        // need to apply the state changes of this call before executing
259                        // the next call
260                        evm.context.evm.db.commit(state)
261                    }
262                }
263
264                // populate the response
265
266                let coinbase_diff = coinbase_balance_after_tx.saturating_sub(initial_coinbase);
267                let eth_sent_to_coinbase = coinbase_diff.saturating_sub(total_gas_fess);
268                let bundle_gas_price =
269                    coinbase_diff.checked_div(U256::from(total_gas_used)).unwrap_or_default();
270                let res = EthCallBundleResponse {
271                    bundle_gas_price,
272                    bundle_hash: hasher.finalize(),
273                    coinbase_diff,
274                    eth_sent_to_coinbase,
275                    gas_fees: total_gas_fess,
276                    results,
277                    state_block_number: state_block_number.to(),
278                    total_gas_used,
279                };
280
281                Ok(res)
282            })
283            .await
284    }
285}
286
287#[async_trait::async_trait]
288impl<Eth> EthCallBundleApiServer for EthBundle<Eth>
289where
290    Eth: EthTransactions<
291            Pool: TransactionPool<Transaction: PoolTransaction<Pooled = PooledTransactionsElement>>,
292        > + LoadPendingBlock
293        + Call
294        + 'static,
295{
296    async fn call_bundle(&self, request: EthCallBundle) -> RpcResult<EthCallBundleResponse> {
297        self.call_bundle(request).await.map_err(Into::into)
298    }
299}
300
301/// Container type for  `EthBundle` internals
302#[derive(Debug)]
303struct EthBundleInner<Eth> {
304    /// Access to commonly used code of the `eth` namespace
305    eth_api: Eth,
306    // restrict the number of concurrent tracing calls.
307    #[allow(dead_code)]
308    blocking_task_guard: BlockingTaskGuard,
309}
310
311impl<Eth> std::fmt::Debug for EthBundle<Eth> {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        f.debug_struct("EthBundle").finish_non_exhaustive()
314    }
315}
316
317impl<Eth> Clone for EthBundle<Eth> {
318    fn clone(&self) -> Self {
319        Self { inner: Arc::clone(&self.inner) }
320    }
321}
322
323/// [`EthBundle`] specific errors.
324#[derive(Debug, thiserror::Error)]
325pub enum EthBundleError {
326    /// Thrown if the bundle does not contain any transactions.
327    #[error("bundle missing txs")]
328    EmptyBundleTransactions,
329    /// Thrown if the bundle does not contain a block number, or block number is 0.
330    #[error("bundle missing blockNumber")]
331    BundleMissingBlockNumber,
332    /// Thrown when the blob gas usage of the blob transactions in a bundle exceed
333    /// [`MAX_BLOB_GAS_PER_BLOCK`].
334    #[error("blob gas usage exceeds the limit of {MAX_BLOB_GAS_PER_BLOCK} gas per block.")]
335    Eip4844BlobGasExceeded,
336}