1use std::sync::Arc;
5
6use alloy_consensus::BlockHeader;
7use alloy_eips::BlockNumberOrTag;
8use alloy_network::Ethereum;
9use alloy_primitives::{Bytes, U256};
10use derive_more::Deref;
11use reth_primitives::NodePrimitives;
12use reth_provider::{
13 BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ProviderBlock,
14 ProviderReceipt,
15};
16use reth_rpc_eth_api::{
17 helpers::{EthSigner, SpawnBlocking},
18 node::RpcNodeCoreExt,
19 EthApiTypes, RpcNodeCore,
20};
21use reth_rpc_eth_types::{
22 EthApiBuilderCtx, EthApiError, EthStateCache, FeeHistoryCache, GasCap, GasPriceOracle,
23 PendingBlock,
24};
25use reth_tasks::{
26 pool::{BlockingTaskGuard, BlockingTaskPool},
27 TaskSpawner, TokioTaskExecutor,
28};
29use tokio::sync::{broadcast, Mutex};
30
31use crate::eth::EthTxBuilder;
32
33const DEFAULT_BROADCAST_CAPACITY: usize = 2000;
34
35#[derive(Deref)]
45pub struct EthApi<Provider: BlockReader, Pool, Network, EvmConfig> {
46 #[deref]
48 pub(super) inner: Arc<EthApiInner<Provider, Pool, Network, EvmConfig>>,
49 pub tx_resp_builder: EthTxBuilder,
51}
52
53impl<Provider, Pool, Network, EvmConfig> Clone for EthApi<Provider, Pool, Network, EvmConfig>
54where
55 Provider: BlockReader,
56{
57 fn clone(&self) -> Self {
58 Self { inner: self.inner.clone(), tx_resp_builder: EthTxBuilder }
59 }
60}
61
62impl<Provider, Pool, Network, EvmConfig> EthApi<Provider, Pool, Network, EvmConfig>
63where
64 Provider: BlockReaderIdExt,
65{
66 #[allow(clippy::too_many_arguments)]
68 pub fn new(
69 provider: Provider,
70 pool: Pool,
71 network: Network,
72 eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
73 gas_oracle: GasPriceOracle<Provider>,
74 gas_cap: impl Into<GasCap>,
75 max_simulate_blocks: u64,
76 eth_proof_window: u64,
77 blocking_task_pool: BlockingTaskPool,
78 fee_history_cache: FeeHistoryCache,
79 evm_config: EvmConfig,
80 proof_permits: usize,
81 ) -> Self {
82 let inner = EthApiInner::new(
83 provider,
84 pool,
85 network,
86 eth_cache,
87 gas_oracle,
88 gas_cap,
89 max_simulate_blocks,
90 eth_proof_window,
91 blocking_task_pool,
92 fee_history_cache,
93 evm_config,
94 TokioTaskExecutor::default(),
95 proof_permits,
96 );
97
98 Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder }
99 }
100}
101
102impl<Provider, Pool, EvmConfig, Network> EthApi<Provider, Pool, Network, EvmConfig>
103where
104 Provider: ChainSpecProvider + BlockReaderIdExt + Clone + 'static,
105 Pool: Clone,
106 EvmConfig: Clone,
107 Network: Clone,
108{
109 pub fn with_spawner<Tasks, Events>(
111 ctx: &EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>,
112 ) -> Self
113 where
114 Tasks: TaskSpawner + Clone + 'static,
115 Events: CanonStateSubscriptions<
116 Primitives: NodePrimitives<
117 Block = ProviderBlock<Provider>,
118 Receipt = ProviderReceipt<Provider>,
119 >,
120 >,
121 {
122 let blocking_task_pool =
123 BlockingTaskPool::build().expect("failed to build blocking task pool");
124
125 let inner = EthApiInner::new(
126 ctx.provider.clone(),
127 ctx.pool.clone(),
128 ctx.network.clone(),
129 ctx.cache.clone(),
130 ctx.new_gas_price_oracle(),
131 ctx.config.rpc_gas_cap,
132 ctx.config.rpc_max_simulate_blocks,
133 ctx.config.eth_proof_window,
134 blocking_task_pool,
135 ctx.new_fee_history_cache(),
136 ctx.evm_config.clone(),
137 ctx.executor.clone(),
138 ctx.config.proof_permits,
139 );
140
141 Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder }
142 }
143}
144
145impl<Provider, Pool, Network, EvmConfig> EthApiTypes for EthApi<Provider, Pool, Network, EvmConfig>
146where
147 Self: Send + Sync,
148 Provider: BlockReader,
149{
150 type Error = EthApiError;
151 type NetworkTypes = Ethereum;
152 type TransactionCompat = EthTxBuilder;
153
154 fn tx_resp_builder(&self) -> &Self::TransactionCompat {
155 &self.tx_resp_builder
156 }
157}
158
159impl<Provider, Pool, Network, EvmConfig> RpcNodeCore for EthApi<Provider, Pool, Network, EvmConfig>
160where
161 Provider: BlockReader + Send + Sync + Clone + Unpin,
162 Pool: Send + Sync + Clone + Unpin,
163 Network: Send + Sync + Clone,
164 EvmConfig: Send + Sync + Clone + Unpin,
165{
166 type Provider = Provider;
167 type Pool = Pool;
168 type Evm = EvmConfig;
169 type Network = Network;
170 type PayloadBuilder = ();
171
172 fn pool(&self) -> &Self::Pool {
173 self.inner.pool()
174 }
175
176 fn evm_config(&self) -> &Self::Evm {
177 self.inner.evm_config()
178 }
179
180 fn network(&self) -> &Self::Network {
181 self.inner.network()
182 }
183
184 fn payload_builder(&self) -> &Self::PayloadBuilder {
185 &()
186 }
187
188 fn provider(&self) -> &Self::Provider {
189 self.inner.provider()
190 }
191}
192
193impl<Provider, Pool, Network, EvmConfig> RpcNodeCoreExt
194 for EthApi<Provider, Pool, Network, EvmConfig>
195where
196 Provider: BlockReader + Send + Sync + Clone + Unpin,
197 Pool: Send + Sync + Clone + Unpin,
198 Network: Send + Sync + Clone,
199 EvmConfig: Send + Sync + Clone + Unpin,
200{
201 #[inline]
202 fn cache(&self) -> &EthStateCache<ProviderBlock<Provider>, ProviderReceipt<Provider>> {
203 self.inner.cache()
204 }
205}
206
207impl<Provider, Pool, Network, EvmConfig> std::fmt::Debug
208 for EthApi<Provider, Pool, Network, EvmConfig>
209where
210 Provider: BlockReader,
211{
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 f.debug_struct("EthApi").finish_non_exhaustive()
214 }
215}
216
217impl<Provider, Pool, Network, EvmConfig> SpawnBlocking
218 for EthApi<Provider, Pool, Network, EvmConfig>
219where
220 Self: Clone + Send + Sync + 'static,
221 Provider: BlockReader,
222{
223 #[inline]
224 fn io_task_spawner(&self) -> impl TaskSpawner {
225 self.inner.task_spawner()
226 }
227
228 #[inline]
229 fn tracing_task_pool(&self) -> &BlockingTaskPool {
230 self.inner.blocking_task_pool()
231 }
232
233 #[inline]
234 fn tracing_task_guard(&self) -> &BlockingTaskGuard {
235 self.inner.blocking_task_guard()
236 }
237}
238
239#[allow(missing_debug_implementations)]
241pub struct EthApiInner<Provider: BlockReader, Pool, Network, EvmConfig> {
242 pool: Pool,
244 provider: Provider,
246 network: Network,
248 signers: parking_lot::RwLock<Vec<Box<dyn EthSigner<Provider::Transaction>>>>,
250 eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
252 gas_oracle: GasPriceOracle<Provider>,
254 gas_cap: u64,
256 max_simulate_blocks: u64,
258 eth_proof_window: u64,
260 starting_block: U256,
262 task_spawner: Box<dyn TaskSpawner>,
264 pending_block: Mutex<Option<PendingBlock<Provider::Block, Provider::Receipt>>>,
266 blocking_task_pool: BlockingTaskPool,
268 fee_history_cache: FeeHistoryCache,
270 evm_config: EvmConfig,
272
273 blocking_task_guard: BlockingTaskGuard,
275
276 raw_tx_sender: broadcast::Sender<Bytes>,
278}
279
280impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
281where
282 Provider: BlockReaderIdExt,
283{
284 #[allow(clippy::too_many_arguments)]
286 pub fn new(
287 provider: Provider,
288 pool: Pool,
289 network: Network,
290 eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
291 gas_oracle: GasPriceOracle<Provider>,
292 gas_cap: impl Into<GasCap>,
293 max_simulate_blocks: u64,
294 eth_proof_window: u64,
295 blocking_task_pool: BlockingTaskPool,
296 fee_history_cache: FeeHistoryCache,
297 evm_config: EvmConfig,
298 task_spawner: impl TaskSpawner + 'static,
299 proof_permits: usize,
300 ) -> Self {
301 let signers = parking_lot::RwLock::new(Default::default());
302 let starting_block = U256::from(
304 provider
305 .header_by_number_or_tag(BlockNumberOrTag::Latest)
306 .ok()
307 .flatten()
308 .map(|header| header.number())
309 .unwrap_or_default(),
310 );
311
312 let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY);
313
314 Self {
315 provider,
316 pool,
317 network,
318 signers,
319 eth_cache,
320 gas_oracle,
321 gas_cap: gas_cap.into().into(),
322 max_simulate_blocks,
323 eth_proof_window,
324 starting_block,
325 task_spawner: Box::new(task_spawner),
326 pending_block: Default::default(),
327 blocking_task_pool,
328 fee_history_cache,
329 evm_config,
330 blocking_task_guard: BlockingTaskGuard::new(proof_permits),
331 raw_tx_sender,
332 }
333 }
334}
335
336impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
337where
338 Provider: BlockReader,
339{
340 #[inline]
342 pub const fn provider(&self) -> &Provider {
343 &self.provider
344 }
345
346 #[inline]
348 pub const fn cache(&self) -> &EthStateCache<Provider::Block, Provider::Receipt> {
349 &self.eth_cache
350 }
351
352 #[inline]
354 pub const fn pending_block(
355 &self,
356 ) -> &Mutex<Option<PendingBlock<Provider::Block, Provider::Receipt>>> {
357 &self.pending_block
358 }
359
360 #[inline]
362 pub const fn task_spawner(&self) -> &dyn TaskSpawner {
363 &*self.task_spawner
364 }
365
366 #[inline]
368 pub const fn blocking_task_pool(&self) -> &BlockingTaskPool {
369 &self.blocking_task_pool
370 }
371
372 #[inline]
374 pub const fn evm_config(&self) -> &EvmConfig {
375 &self.evm_config
376 }
377
378 #[inline]
380 pub const fn pool(&self) -> &Pool {
381 &self.pool
382 }
383
384 #[inline]
386 pub const fn gas_cap(&self) -> u64 {
387 self.gas_cap
388 }
389
390 #[inline]
392 pub const fn max_simulate_blocks(&self) -> u64 {
393 self.max_simulate_blocks
394 }
395
396 #[inline]
398 pub const fn gas_oracle(&self) -> &GasPriceOracle<Provider> {
399 &self.gas_oracle
400 }
401
402 #[inline]
404 pub const fn fee_history_cache(&self) -> &FeeHistoryCache {
405 &self.fee_history_cache
406 }
407
408 #[inline]
410 pub const fn signers(
411 &self,
412 ) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<Provider::Transaction>>>> {
413 &self.signers
414 }
415
416 #[inline]
418 pub const fn starting_block(&self) -> U256 {
419 self.starting_block
420 }
421
422 #[inline]
424 pub const fn network(&self) -> &Network {
425 &self.network
426 }
427
428 #[inline]
430 pub const fn eth_proof_window(&self) -> u64 {
431 self.eth_proof_window
432 }
433
434 #[inline]
436 pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard {
437 &self.blocking_task_guard
438 }
439
440 #[inline]
442 pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver<Bytes> {
443 self.raw_tx_sender.subscribe()
444 }
445
446 #[inline]
448 pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) {
449 let _ = self.raw_tx_sender.send(raw_tx);
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use crate::EthApi;
456 use alloy_consensus::Header;
457 use alloy_eips::BlockNumberOrTag;
458 use alloy_primitives::{PrimitiveSignature as Signature, B256, U64};
459 use alloy_rpc_types::FeeHistory;
460 use jsonrpsee_types::error::INVALID_PARAMS_CODE;
461 use reth_chainspec::{BaseFeeParams, ChainSpec};
462 use reth_evm_ethereum::EthEvmConfig;
463 use reth_network_api::noop::NoopNetwork;
464 use reth_primitives::{Block, BlockBody, TransactionSigned};
465 use reth_provider::{
466 test_utils::{MockEthProvider, NoopProvider},
467 BlockReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderFactory,
468 };
469 use reth_rpc_eth_api::EthApiServer;
470 use reth_rpc_eth_types::{
471 EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasCap, GasPriceOracle,
472 };
473 use reth_rpc_server_types::constants::{
474 DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_PROOF_PERMITS,
475 };
476 use reth_tasks::pool::BlockingTaskPool;
477 use reth_testing_utils::{generators, generators::Rng};
478 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
479
480 fn build_test_eth_api<
481 P: BlockReaderIdExt<
482 Block = reth_primitives::Block,
483 Receipt = reth_primitives::Receipt,
484 Header = reth_primitives::Header,
485 > + BlockReader
486 + ChainSpecProvider<ChainSpec = ChainSpec>
487 + EvmEnvProvider
488 + StateProviderFactory
489 + Unpin
490 + Clone
491 + 'static,
492 >(
493 provider: P,
494 ) -> EthApi<P, TestPool, NoopNetwork, EthEvmConfig> {
495 let evm_config = EthEvmConfig::new(provider.chain_spec());
496 let cache = EthStateCache::spawn(provider.clone(), Default::default());
497 let fee_history_cache = FeeHistoryCache::new(FeeHistoryCacheConfig::default());
498
499 EthApi::new(
500 provider.clone(),
501 testing_pool(),
502 NoopNetwork::default(),
503 cache.clone(),
504 GasPriceOracle::new(provider, Default::default(), cache),
505 GasCap::default(),
506 DEFAULT_MAX_SIMULATE_BLOCKS,
507 DEFAULT_ETH_PROOF_WINDOW,
508 BlockingTaskPool::build().expect("failed to build tracing pool"),
509 fee_history_cache,
510 evm_config,
511 DEFAULT_PROOF_PERMITS,
512 )
513 }
514
515 fn prepare_eth_api(
517 newest_block: u64,
518 mut oldest_block: Option<B256>,
519 block_count: u64,
520 mock_provider: MockEthProvider,
521 ) -> (EthApi<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>, Vec<u128>, Vec<f64>) {
522 let mut rng = generators::rng();
523
524 let mut gas_used_ratios = Vec::with_capacity(block_count as usize);
526 let mut base_fees_per_gas = Vec::with_capacity(block_count as usize);
527 let mut last_header = None;
528 let mut parent_hash = B256::default();
529
530 for i in (0..block_count).rev() {
531 let hash = rng.gen();
532 let gas_limit = rng.gen::<u32>() as u64;
534 let base_fee_per_gas: Option<u64> = rng.gen::<bool>().then(|| rng.gen::<u32>() as u64);
535 let gas_used = rng.gen::<u32>() as u64;
536
537 let header = Header {
538 number: newest_block - i,
539 gas_limit,
540 gas_used,
541 base_fee_per_gas: base_fee_per_gas.map(Into::into),
542 parent_hash,
543 ..Default::default()
544 };
545 last_header = Some(header.clone());
546 parent_hash = hash;
547
548 const TOTAL_TRANSACTIONS: usize = 100;
549 let mut transactions = Vec::with_capacity(TOTAL_TRANSACTIONS);
550 for _ in 0..TOTAL_TRANSACTIONS {
551 let random_fee: u128 = rng.gen();
552
553 if let Some(base_fee_per_gas) = header.base_fee_per_gas {
554 let transaction = TransactionSigned::new_unhashed(
555 reth_primitives::Transaction::Eip1559(alloy_consensus::TxEip1559 {
556 max_priority_fee_per_gas: random_fee,
557 max_fee_per_gas: random_fee + base_fee_per_gas as u128,
558 ..Default::default()
559 }),
560 Signature::test_signature(),
561 );
562
563 transactions.push(transaction);
564 } else {
565 let transaction = TransactionSigned::new_unhashed(
566 reth_primitives::Transaction::Legacy(Default::default()),
567 Signature::test_signature(),
568 );
569
570 transactions.push(transaction);
571 }
572 }
573
574 mock_provider.add_block(
575 hash,
576 Block {
577 header: header.clone(),
578 body: BlockBody { transactions, ..Default::default() },
579 },
580 );
581 mock_provider.add_header(hash, header);
582
583 oldest_block.get_or_insert(hash);
584 gas_used_ratios.push(gas_used as f64 / gas_limit as f64);
585 base_fees_per_gas.push(base_fee_per_gas.map(|fee| fee as u128).unwrap_or_default());
586 }
587
588 let last_header = last_header.unwrap();
590 base_fees_per_gas.push(BaseFeeParams::ethereum().next_block_base_fee(
591 last_header.gas_used,
592 last_header.gas_limit,
593 last_header.base_fee_per_gas.unwrap_or_default(),
594 ) as u128);
595
596 let eth_api = build_test_eth_api(mock_provider);
597
598 (eth_api, base_fees_per_gas, gas_used_ratios)
599 }
600
601 #[tokio::test]
603 async fn test_fee_history_empty() {
604 let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
605 &build_test_eth_api(NoopProvider::default()),
606 U64::from(1),
607 BlockNumberOrTag::Latest,
608 None,
609 )
610 .await;
611 assert!(response.is_err());
612 let error_object = response.unwrap_err();
613 assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
614 }
615
616 #[tokio::test]
617 async fn test_fee_history_invalid_block_range_before_genesis() {
619 let block_count = 10;
620 let newest_block = 1337;
621 let oldest_block = None;
622
623 let (eth_api, _, _) =
624 prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
625
626 let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
627 ð_api,
628 U64::from(newest_block + 1),
629 newest_block.into(),
630 Some(vec![10.0]),
631 )
632 .await;
633
634 assert!(response.is_err());
635 let error_object = response.unwrap_err();
636 assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
637 }
638
639 #[tokio::test]
640 async fn test_fee_history_invalid_block_range_in_future() {
642 let block_count = 10;
643 let newest_block = 1337;
644 let oldest_block = None;
645
646 let (eth_api, _, _) =
647 prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
648
649 let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
650 ð_api,
651 U64::from(1),
652 (newest_block + 1000).into(),
653 Some(vec![10.0]),
654 )
655 .await;
656
657 assert!(response.is_err());
658 let error_object = response.unwrap_err();
659 assert_eq!(error_object.code(), INVALID_PARAMS_CODE);
660 }
661
662 #[tokio::test]
663 async fn test_fee_history_no_block_requested() {
665 let block_count = 10;
666 let newest_block = 1337;
667 let oldest_block = None;
668
669 let (eth_api, _, _) =
670 prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
671
672 let response = <EthApi<_, _, _, _> as EthApiServer<_, _, _, _>>::fee_history(
673 ð_api,
674 U64::from(0),
675 newest_block.into(),
676 None,
677 )
678 .await
679 .unwrap();
680 assert_eq!(
681 response,
682 FeeHistory::default(),
683 "none: requesting no block should yield a default response"
684 );
685 }
686
687 #[tokio::test]
688 async fn test_fee_history_single_block() {
690 let block_count = 10;
691 let newest_block = 1337;
692 let oldest_block = None;
693
694 let (eth_api, base_fees_per_gas, gas_used_ratios) =
695 prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
696
697 let fee_history =
698 eth_api.fee_history(U64::from(1), newest_block.into(), None).await.unwrap();
699 assert_eq!(
700 fee_history.base_fee_per_gas,
701 &base_fees_per_gas[base_fees_per_gas.len() - 2..],
702 "one: base fee per gas is incorrect"
703 );
704 assert_eq!(
705 fee_history.base_fee_per_gas.len(),
706 2,
707 "one: should return base fee of the next block as well"
708 );
709 assert_eq!(
710 &fee_history.gas_used_ratio,
711 &gas_used_ratios[gas_used_ratios.len() - 1..],
712 "one: gas used ratio is incorrect"
713 );
714 assert_eq!(fee_history.oldest_block, newest_block, "one: oldest block is incorrect");
715 assert!(
716 fee_history.reward.is_none(),
717 "one: no percentiles were requested, so there should be no rewards result"
718 );
719 }
720
721 #[tokio::test]
723 async fn test_fee_history_all_blocks() {
724 let block_count = 10;
725 let newest_block = 1337;
726 let oldest_block = None;
727
728 let (eth_api, base_fees_per_gas, gas_used_ratios) =
729 prepare_eth_api(newest_block, oldest_block, block_count, MockEthProvider::default());
730
731 let fee_history =
732 eth_api.fee_history(U64::from(block_count), newest_block.into(), None).await.unwrap();
733
734 assert_eq!(
735 &fee_history.base_fee_per_gas, &base_fees_per_gas,
736 "all: base fee per gas is incorrect"
737 );
738 assert_eq!(
739 fee_history.base_fee_per_gas.len() as u64,
740 block_count + 1,
741 "all: should return base fee of the next block as well"
742 );
743 assert_eq!(
744 &fee_history.gas_used_ratio, &gas_used_ratios,
745 "all: gas used ratio is incorrect"
746 );
747 assert_eq!(
748 fee_history.oldest_block,
749 newest_block - block_count + 1,
750 "all: oldest block is incorrect"
751 );
752 assert!(
753 fee_history.reward.is_none(),
754 "all: no percentiles were requested, so there should be no rewards result"
755 );
756 }
757}