reth_rpc/
txpool.rs

1use core::fmt;
2use std::collections::BTreeMap;
3
4use alloy_consensus::Transaction;
5use alloy_primitives::Address;
6use alloy_rpc_types_txpool::{
7    TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus,
8};
9use async_trait::async_trait;
10use jsonrpsee::core::RpcResult;
11use reth_rpc_api::TxPoolApiServer;
12use reth_rpc_types_compat::{transaction::from_recovered, TransactionCompat};
13use reth_transaction_pool::{
14    AllPoolTransactions, PoolConsensusTx, PoolTransaction, TransactionPool,
15};
16use tracing::trace;
17
18/// `txpool` API implementation.
19///
20/// This type provides the functionality for handling `txpool` related requests.
21#[derive(Clone)]
22pub struct TxPoolApi<Pool, Eth> {
23    /// An interface to interact with the pool
24    pool: Pool,
25    tx_resp_builder: Eth,
26}
27
28impl<Pool, Eth> TxPoolApi<Pool, Eth> {
29    /// Creates a new instance of `TxpoolApi`.
30    pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
31        Self { pool, tx_resp_builder }
32    }
33}
34
35impl<Pool, Eth> TxPoolApi<Pool, Eth>
36where
37    Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
38    Eth: TransactionCompat<PoolConsensusTx<Pool>>,
39{
40    fn content(&self) -> Result<TxpoolContent<Eth::Transaction>, Eth::Error> {
41        #[inline]
42        fn insert<Tx, RpcTxB>(
43            tx: &Tx,
44            content: &mut BTreeMap<Address, BTreeMap<String, RpcTxB::Transaction>>,
45            resp_builder: &RpcTxB,
46        ) -> Result<(), RpcTxB::Error>
47        where
48            Tx: PoolTransaction,
49            RpcTxB: TransactionCompat<Tx::Consensus>,
50        {
51            content.entry(tx.sender()).or_default().insert(
52                tx.nonce().to_string(),
53                from_recovered(tx.clone_into_consensus(), resp_builder)?,
54            );
55
56            Ok(())
57        }
58
59        let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
60
61        let mut content = TxpoolContent { pending: BTreeMap::new(), queued: BTreeMap::new() };
62        for pending in pending {
63            insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder)?;
64        }
65        for queued in queued {
66            insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder)?;
67        }
68
69        Ok(content)
70    }
71}
72
73#[async_trait]
74impl<Pool, Eth> TxPoolApiServer<Eth::Transaction> for TxPoolApi<Pool, Eth>
75where
76    Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
77    Eth: TransactionCompat<PoolConsensusTx<Pool>> + 'static,
78{
79    /// Returns the number of transactions currently pending for inclusion in the next block(s), as
80    /// well as the ones that are being scheduled for future execution only.
81    /// Ref: [Here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status)
82    ///
83    /// Handler for `txpool_status`
84    async fn txpool_status(&self) -> RpcResult<TxpoolStatus> {
85        trace!(target: "rpc::eth", "Serving txpool_status");
86        let all = self.pool.all_transactions();
87        Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
88    }
89
90    /// Returns a summary of all the transactions currently pending for inclusion in the next
91    /// block(s), as well as the ones that are being scheduled for future execution only.
92    ///
93    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect) for more details
94    ///
95    /// Handler for `txpool_inspect`
96    async fn txpool_inspect(&self) -> RpcResult<TxpoolInspect> {
97        trace!(target: "rpc::eth", "Serving txpool_inspect");
98
99        #[inline]
100        fn insert<T: PoolTransaction<Consensus: Transaction>>(
101            tx: &T,
102            inspect: &mut BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,
103        ) {
104            let entry = inspect.entry(tx.sender()).or_default();
105            let tx = tx.clone_into_consensus();
106            entry.insert(
107                tx.nonce().to_string(),
108                TxpoolInspectSummary {
109                    to: tx.to(),
110                    value: tx.value(),
111                    gas: tx.gas_limit() as u128,
112                    gas_price: tx.max_fee_per_gas(),
113                },
114            );
115        }
116
117        let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
118
119        Ok(TxpoolInspect {
120            pending: pending.iter().fold(Default::default(), |mut acc, tx| {
121                insert(&tx.transaction, &mut acc);
122                acc
123            }),
124            queued: queued.iter().fold(Default::default(), |mut acc, tx| {
125                insert(&tx.transaction, &mut acc);
126                acc
127            }),
128        })
129    }
130
131    /// Retrieves the transactions contained within the txpool, returning pending as well as queued
132    /// transactions of this address, grouped by nonce.
133    ///
134    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_contentFrom) for more details
135    /// Handler for `txpool_contentFrom`
136    async fn txpool_content_from(
137        &self,
138        from: Address,
139    ) -> RpcResult<TxpoolContentFrom<Eth::Transaction>> {
140        trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
141        Ok(self.content().map_err(Into::into)?.remove_from(&from))
142    }
143
144    /// Returns the details of all transactions currently pending for inclusion in the next
145    /// block(s), as well as the ones that are being scheduled for future execution only.
146    ///
147    /// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content) for more details
148    /// Handler for `txpool_content`
149    async fn txpool_content(&self) -> RpcResult<TxpoolContent<Eth::Transaction>> {
150        trace!(target: "rpc::eth", "Serving txpool_content");
151        Ok(self.content().map_err(Into::into)?)
152    }
153}
154
155impl<Pool, Eth> fmt::Debug for TxPoolApi<Pool, Eth> {
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        f.debug_struct("TxpoolApi").finish_non_exhaustive()
158    }
159}