1use core::fmt;
2use std::collections::BTreeMap;
3
4use alloy_consensus::Transaction;
5use alloy_primitives::Address;
6use alloy_rpc_types_txpool::{
7 TxpoolContent, TxpoolContentFrom, TxpoolInspect, TxpoolInspectSummary, TxpoolStatus,
8};
9use async_trait::async_trait;
10use jsonrpsee::core::RpcResult;
11use reth_rpc_api::TxPoolApiServer;
12use reth_rpc_types_compat::{transaction::from_recovered, TransactionCompat};
13use reth_transaction_pool::{
14 AllPoolTransactions, PoolConsensusTx, PoolTransaction, TransactionPool,
15};
16use tracing::trace;
17
18#[derive(Clone)]
22pub struct TxPoolApi<Pool, Eth> {
23 pool: Pool,
25 tx_resp_builder: Eth,
26}
27
28impl<Pool, Eth> TxPoolApi<Pool, Eth> {
29 pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
31 Self { pool, tx_resp_builder }
32 }
33}
34
35impl<Pool, Eth> TxPoolApi<Pool, Eth>
36where
37 Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
38 Eth: TransactionCompat<PoolConsensusTx<Pool>>,
39{
40 fn content(&self) -> Result<TxpoolContent<Eth::Transaction>, Eth::Error> {
41 #[inline]
42 fn insert<Tx, RpcTxB>(
43 tx: &Tx,
44 content: &mut BTreeMap<Address, BTreeMap<String, RpcTxB::Transaction>>,
45 resp_builder: &RpcTxB,
46 ) -> Result<(), RpcTxB::Error>
47 where
48 Tx: PoolTransaction,
49 RpcTxB: TransactionCompat<Tx::Consensus>,
50 {
51 content.entry(tx.sender()).or_default().insert(
52 tx.nonce().to_string(),
53 from_recovered(tx.clone_into_consensus(), resp_builder)?,
54 );
55
56 Ok(())
57 }
58
59 let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
60
61 let mut content = TxpoolContent { pending: BTreeMap::new(), queued: BTreeMap::new() };
62 for pending in pending {
63 insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder)?;
64 }
65 for queued in queued {
66 insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder)?;
67 }
68
69 Ok(content)
70 }
71}
72
73#[async_trait]
74impl<Pool, Eth> TxPoolApiServer<Eth::Transaction> for TxPoolApi<Pool, Eth>
75where
76 Pool: TransactionPool<Transaction: PoolTransaction<Consensus: Transaction>> + 'static,
77 Eth: TransactionCompat<PoolConsensusTx<Pool>> + 'static,
78{
79 async fn txpool_status(&self) -> RpcResult<TxpoolStatus> {
85 trace!(target: "rpc::eth", "Serving txpool_status");
86 let all = self.pool.all_transactions();
87 Ok(TxpoolStatus { pending: all.pending.len() as u64, queued: all.queued.len() as u64 })
88 }
89
90 async fn txpool_inspect(&self) -> RpcResult<TxpoolInspect> {
97 trace!(target: "rpc::eth", "Serving txpool_inspect");
98
99 #[inline]
100 fn insert<T: PoolTransaction<Consensus: Transaction>>(
101 tx: &T,
102 inspect: &mut BTreeMap<Address, BTreeMap<String, TxpoolInspectSummary>>,
103 ) {
104 let entry = inspect.entry(tx.sender()).or_default();
105 let tx = tx.clone_into_consensus();
106 entry.insert(
107 tx.nonce().to_string(),
108 TxpoolInspectSummary {
109 to: tx.to(),
110 value: tx.value(),
111 gas: tx.gas_limit() as u128,
112 gas_price: tx.max_fee_per_gas(),
113 },
114 );
115 }
116
117 let AllPoolTransactions { pending, queued } = self.pool.all_transactions();
118
119 Ok(TxpoolInspect {
120 pending: pending.iter().fold(Default::default(), |mut acc, tx| {
121 insert(&tx.transaction, &mut acc);
122 acc
123 }),
124 queued: queued.iter().fold(Default::default(), |mut acc, tx| {
125 insert(&tx.transaction, &mut acc);
126 acc
127 }),
128 })
129 }
130
131 async fn txpool_content_from(
137 &self,
138 from: Address,
139 ) -> RpcResult<TxpoolContentFrom<Eth::Transaction>> {
140 trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
141 Ok(self.content().map_err(Into::into)?.remove_from(&from))
142 }
143
144 async fn txpool_content(&self) -> RpcResult<TxpoolContent<Eth::Transaction>> {
150 trace!(target: "rpc::eth", "Serving txpool_content");
151 Ok(self.content().map_err(Into::into)?)
152 }
153}
154
155impl<Pool, Eth> fmt::Debug for TxPoolApi<Pool, Eth> {
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("TxpoolApi").finish_non_exhaustive()
158 }
159}