reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::future::TryFutureExt;
11use jsonrpsee::{core::RpcResult, server::IdProvider};
12use reth_errors::ProviderError;
13use reth_rpc_eth_api::{
14    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcNodeCoreExt,
15    RpcTransaction, TransactionCompat,
16};
17use reth_rpc_eth_types::{
18    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
19    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
20};
21use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
22use reth_storage_api::{
23    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
24    ProviderReceipt,
25};
26use reth_tasks::TaskSpawner;
27use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
28use std::{
29    collections::HashMap,
30    fmt,
31    future::Future,
32    iter::StepBy,
33    ops::RangeInclusive,
34    sync::Arc,
35    time::{Duration, Instant},
36};
37use tokio::{
38    sync::{mpsc::Receiver, oneshot, Mutex},
39    time::MissedTickBehavior,
40};
41use tracing::{error, trace};
42
43impl<Eth> EngineEthFilter for EthFilter<Eth>
44where
45    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
46{
47    /// Returns logs matching given filter object, no query limits
48    fn logs(
49        &self,
50        filter: Filter,
51        limits: QueryLimits,
52    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
53        trace!(target: "rpc::eth", "Serving eth_getLogs");
54        self.logs_for_filter(filter, limits).map_err(|e| e.into())
55    }
56}
57
58/// The maximum number of headers we read at once when handling a range filter.
59const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
60
61/// `Eth` filter RPC implementation.
62///
63/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
64pub struct EthFilter<Eth: EthApiTypes> {
65    /// All nested fields bundled together
66    inner: Arc<EthFilterInner<Eth>>,
67}
68
69impl<Eth> Clone for EthFilter<Eth>
70where
71    Eth: EthApiTypes,
72{
73    fn clone(&self) -> Self {
74        Self { inner: self.inner.clone() }
75    }
76}
77
78impl<Eth> EthFilter<Eth>
79where
80    Eth: EthApiTypes + 'static,
81{
82    /// Creates a new, shareable instance.
83    ///
84    /// This uses the given pool to get notified about new transactions, the provider to interact
85    /// with the blockchain, the cache to fetch cacheable data, like the logs.
86    ///
87    /// See also [`EthFilterConfig`].
88    ///
89    /// This also spawns a task that periodically clears stale filters.
90    ///
91    /// # Create a new instance with [`EthApi`](crate::EthApi)
92    ///
93    /// ```no_run
94    /// use reth_evm_ethereum::EthEvmConfig;
95    /// use reth_network_api::noop::NoopNetwork;
96    /// use reth_provider::noop::NoopProvider;
97    /// use reth_rpc::{EthApi, EthFilter};
98    /// use reth_tasks::TokioTaskExecutor;
99    /// use reth_transaction_pool::noop::NoopTransactionPool;
100    /// let eth_api = EthApi::builder(
101    ///     NoopProvider::default(),
102    ///     NoopTransactionPool::default(),
103    ///     NoopNetwork::default(),
104    ///     EthEvmConfig::mainnet(),
105    /// )
106    /// .build();
107    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
108    /// ```
109    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
110        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
111            config;
112        let inner = EthFilterInner {
113            eth_api,
114            active_filters: ActiveFilters::new(),
115            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
116            max_headers_range: MAX_HEADERS_RANGE,
117            task_spawner,
118            stale_filter_ttl,
119            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
120        };
121
122        let eth_filter = Self { inner: Arc::new(inner) };
123
124        let this = eth_filter.clone();
125        eth_filter.inner.task_spawner.spawn_critical(
126            "eth-filters_stale-filters-clean",
127            Box::pin(async move {
128                this.watch_and_clear_stale_filters().await;
129            }),
130        );
131
132        eth_filter
133    }
134
135    /// Returns all currently active filters
136    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
137        &self.inner.active_filters
138    }
139
140    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
141    /// Nonetheless, this endless future frees the thread at every await point.
142    async fn watch_and_clear_stale_filters(&self) {
143        let mut interval = tokio::time::interval_at(
144            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
145            self.inner.stale_filter_ttl,
146        );
147        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
148        loop {
149            interval.tick().await;
150            self.clear_stale_filters(Instant::now()).await;
151        }
152    }
153
154    /// Clears all filters that have not been polled for longer than the configured
155    /// `stale_filter_ttl` at the given instant.
156    pub async fn clear_stale_filters(&self, now: Instant) {
157        trace!(target: "rpc::eth", "clear stale filters");
158        self.active_filters().inner.lock().await.retain(|id, filter| {
159            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
160
161            if !is_valid {
162                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
163            }
164
165            is_valid
166        })
167    }
168}
169
170impl<Eth> EthFilter<Eth>
171where
172    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
173{
174    /// Access the underlying provider.
175    fn provider(&self) -> &Eth::Provider {
176        self.inner.eth_api.provider()
177    }
178
179    /// Access the underlying pool.
180    fn pool(&self) -> &Eth::Pool {
181        self.inner.eth_api.pool()
182    }
183
184    /// Returns all the filter changes for the given id, if any
185    pub async fn filter_changes(
186        &self,
187        id: FilterId,
188    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
189        let info = self.provider().chain_info()?;
190        let best_number = info.best_number;
191
192        // start_block is the block from which we should start fetching changes, the next block from
193        // the last time changes were polled, in other words the best block at last poll + 1
194        let (start_block, kind) = {
195            let mut filters = self.inner.active_filters.inner.lock().await;
196            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
197
198            if filter.block > best_number {
199                // no new blocks since the last poll
200                return Ok(FilterChanges::Empty)
201            }
202
203            // update filter
204            // we fetch all changes from [filter.block..best_block], so we advance the filter's
205            // block to `best_block +1`, the next from which we should start fetching changes again
206            let mut block = best_number + 1;
207            std::mem::swap(&mut filter.block, &mut block);
208            filter.last_poll_timestamp = Instant::now();
209
210            (block, filter.kind.clone())
211        };
212
213        match kind {
214            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
215            FilterKind::Block => {
216                // Note: we need to fetch the block hashes from inclusive range
217                // [start_block..best_block]
218                let end_block = best_number + 1;
219                let block_hashes =
220                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
221                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
222                    )?;
223                Ok(FilterChanges::Hashes(block_hashes))
224            }
225            FilterKind::Log(filter) => {
226                let (from_block_number, to_block_number) = match filter.block_option {
227                    FilterBlockOption::Range { from_block, to_block } => {
228                        let from = from_block
229                            .map(|num| self.provider().convert_block_number(num))
230                            .transpose()?
231                            .flatten();
232                        let to = to_block
233                            .map(|num| self.provider().convert_block_number(num))
234                            .transpose()?
235                            .flatten();
236                        logs_utils::get_filter_block_range(from, to, start_block, info)
237                    }
238                    FilterBlockOption::AtBlockHash(_) => {
239                        // blockHash is equivalent to fromBlock = toBlock = the block number with
240                        // hash blockHash
241                        // get_logs_in_block_range is inclusive
242                        (start_block, best_number)
243                    }
244                };
245                let logs = self
246                    .inner
247                    .clone()
248                    .get_logs_in_block_range(
249                        *filter,
250                        from_block_number,
251                        to_block_number,
252                        self.inner.query_limits,
253                    )
254                    .await?;
255                Ok(FilterChanges::Logs(logs))
256            }
257        }
258    }
259
260    /// Returns an array of all logs matching filter with given id.
261    ///
262    /// Returns an error if no matching log filter exists.
263    ///
264    /// Handler for `eth_getFilterLogs`
265    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
266        let filter = {
267            let filters = self.inner.active_filters.inner.lock().await;
268            if let FilterKind::Log(ref filter) =
269                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
270            {
271                *filter.clone()
272            } else {
273                // Not a log filter
274                return Err(EthFilterError::FilterNotFound(id))
275            }
276        };
277
278        self.logs_for_filter(filter, self.inner.query_limits).await
279    }
280
281    /// Returns logs matching given filter object.
282    async fn logs_for_filter(
283        &self,
284        filter: Filter,
285        limits: QueryLimits,
286    ) -> Result<Vec<Log>, EthFilterError> {
287        self.inner.clone().logs_for_filter(filter, limits).await
288    }
289}
290
291#[async_trait]
292impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
293where
294    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
295{
296    /// Handler for `eth_newFilter`
297    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
298        trace!(target: "rpc::eth", "Serving eth_newFilter");
299        self.inner
300            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
301            .await
302    }
303
304    /// Handler for `eth_newBlockFilter`
305    async fn new_block_filter(&self) -> RpcResult<FilterId> {
306        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
307        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
308    }
309
310    /// Handler for `eth_newPendingTransactionFilter`
311    async fn new_pending_transaction_filter(
312        &self,
313        kind: Option<PendingTransactionFilterKind>,
314    ) -> RpcResult<FilterId> {
315        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
316
317        let transaction_kind = match kind.unwrap_or_default() {
318            PendingTransactionFilterKind::Hashes => {
319                let receiver = self.pool().pending_transactions_listener();
320                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
321                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
322            }
323            PendingTransactionFilterKind::Full => {
324                let stream = self.pool().new_pending_pool_transactions_listener();
325                let full_txs_receiver = FullTransactionsReceiver::new(
326                    stream,
327                    self.inner.eth_api.tx_resp_builder().clone(),
328                );
329                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
330                    full_txs_receiver,
331                )))
332            }
333        };
334
335        //let filter = FilterKind::PendingTransaction(transaction_kind);
336
337        // Install the filter and propagate any errors
338        self.inner.install_filter(transaction_kind).await
339    }
340
341    /// Handler for `eth_getFilterChanges`
342    async fn filter_changes(
343        &self,
344        id: FilterId,
345    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
346        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
347        Ok(Self::filter_changes(self, id).await?)
348    }
349
350    /// Returns an array of all logs matching filter with given id.
351    ///
352    /// Returns an error if no matching log filter exists.
353    ///
354    /// Handler for `eth_getFilterLogs`
355    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
356        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
357        Ok(Self::filter_logs(self, id).await?)
358    }
359
360    /// Handler for `eth_uninstallFilter`
361    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
362        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
363        let mut filters = self.inner.active_filters.inner.lock().await;
364        if filters.remove(&id).is_some() {
365            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
366            Ok(true)
367        } else {
368            Ok(false)
369        }
370    }
371
372    /// Returns logs matching given filter object.
373    ///
374    /// Handler for `eth_getLogs`
375    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
376        trace!(target: "rpc::eth", "Serving eth_getLogs");
377        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
378    }
379}
380
381impl<Eth> std::fmt::Debug for EthFilter<Eth>
382where
383    Eth: EthApiTypes,
384{
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        f.debug_struct("EthFilter").finish_non_exhaustive()
387    }
388}
389
390/// Container type `EthFilter`
391#[derive(Debug)]
392struct EthFilterInner<Eth: EthApiTypes> {
393    /// Inner `eth` API implementation.
394    eth_api: Eth,
395    /// All currently installed filters.
396    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
397    /// Provides ids to identify filters
398    id_provider: Arc<dyn IdProvider>,
399    /// limits for logs queries
400    query_limits: QueryLimits,
401    /// maximum number of headers to read at once for range filter
402    max_headers_range: u64,
403    /// The type that can spawn tasks.
404    task_spawner: Box<dyn TaskSpawner>,
405    /// Duration since the last filter poll, after which the filter is considered stale
406    stale_filter_ttl: Duration,
407}
408
409impl<Eth> EthFilterInner<Eth>
410where
411    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
412{
413    /// Access the underlying provider.
414    fn provider(&self) -> &Eth::Provider {
415        self.eth_api.provider()
416    }
417
418    /// Access the underlying [`EthStateCache`].
419    fn eth_cache(
420        &self,
421    ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
422        self.eth_api.cache()
423    }
424
425    /// Returns logs matching given filter object.
426    async fn logs_for_filter(
427        self: Arc<Self>,
428        filter: Filter,
429        limits: QueryLimits,
430    ) -> Result<Vec<Log>, EthFilterError> {
431        match filter.block_option {
432            FilterBlockOption::AtBlockHash(block_hash) => {
433                // for all matching logs in the block
434                // get the block header with the hash
435                let header = self
436                    .provider()
437                    .header_by_hash_or_number(block_hash.into())?
438                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
439
440                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
441
442                // we also need to ensure that the receipts are available and return an error if
443                // not, in case the block hash been reorged
444                let (receipts, maybe_block) = self
445                    .eth_cache()
446                    .get_receipts_and_maybe_block(block_num_hash.hash)
447                    .await?
448                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
449
450                let mut all_logs = Vec::new();
451                append_matching_block_logs(
452                    &mut all_logs,
453                    maybe_block
454                        .map(ProviderOrBlock::Block)
455                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
456                    &filter,
457                    block_num_hash,
458                    &receipts,
459                    false,
460                    header.timestamp(),
461                )?;
462
463                Ok(all_logs)
464            }
465            FilterBlockOption::Range { from_block, to_block } => {
466                // compute the range
467                let info = self.provider().chain_info()?;
468
469                // we start at the most recent block if unset in filter
470                let start_block = info.best_number;
471                let from = from_block
472                    .map(|num| self.provider().convert_block_number(num))
473                    .transpose()?
474                    .flatten();
475                let to = to_block
476                    .map(|num| self.provider().convert_block_number(num))
477                    .transpose()?
478                    .flatten();
479                let (from_block_number, to_block_number) =
480                    logs_utils::get_filter_block_range(from, to, start_block, info);
481                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
482                    .await
483            }
484        }
485    }
486
487    /// Installs a new filter and returns the new identifier.
488    async fn install_filter(
489        &self,
490        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
491    ) -> RpcResult<FilterId> {
492        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
493        let subscription_id = self.id_provider.next_id();
494
495        let id = match subscription_id {
496            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
497            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
498        };
499        let mut filters = self.active_filters.inner.lock().await;
500        filters.insert(
501            id.clone(),
502            ActiveFilter {
503                block: last_poll_block_number,
504                last_poll_timestamp: Instant::now(),
505                kind,
506            },
507        );
508        Ok(id)
509    }
510
511    /// Returns all logs in the given _inclusive_ range that match the filter
512    ///
513    /// Returns an error if:
514    ///  - underlying database error
515    ///  - amount of matches exceeds configured limit
516    async fn get_logs_in_block_range(
517        self: Arc<Self>,
518        filter: Filter,
519        from_block: u64,
520        to_block: u64,
521        limits: QueryLimits,
522    ) -> Result<Vec<Log>, EthFilterError> {
523        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
524
525        // perform boundary checks first
526        if to_block < from_block {
527            return Err(EthFilterError::InvalidBlockRangeParams)
528        }
529
530        if let Some(max_blocks_per_filter) =
531            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
532        {
533            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
534        }
535
536        let (tx, rx) = oneshot::channel();
537        let this = self.clone();
538        self.task_spawner.spawn_blocking(Box::pin(async move {
539            let res =
540                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
541            let _ = tx.send(res);
542        }));
543
544        rx.await.map_err(|_| EthFilterError::InternalError)?
545    }
546
547    /// Returns all logs in the given _inclusive_ range that match the filter
548    ///
549    /// Note: This function uses a mix of blocking db operations for fetching indices and header
550    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
551    /// This function is considered blocking and should thus be spawned on a blocking task.
552    ///
553    /// Returns an error if:
554    ///  - underlying database error
555    async fn get_logs_in_block_range_inner(
556        &self,
557        filter: &Filter,
558        from_block: u64,
559        to_block: u64,
560        limits: QueryLimits,
561    ) -> Result<Vec<Log>, EthFilterError> {
562        let mut all_logs = Vec::new();
563
564        // loop over the range of new blocks and check logs if the filter matches the log's bloom
565        // filter
566        for (from, to) in
567            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
568        {
569            let headers = self.provider().headers_range(from..=to)?;
570            for (idx, header) in headers
571                .iter()
572                .enumerate()
573                .filter(|(_, header)| filter.matches_bloom(header.logs_bloom()))
574            {
575                // these are consecutive headers, so we can use the parent hash of the next
576                // block to get the current header's hash
577                let block_hash = match headers.get(idx + 1) {
578                    Some(child) => child.parent_hash(),
579                    None => self
580                        .provider()
581                        .block_hash(header.number())?
582                        .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
583                };
584
585                let num_hash = BlockNumHash::new(header.number(), block_hash);
586                if let Some((receipts, maybe_block)) =
587                    self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
588                {
589                    append_matching_block_logs(
590                        &mut all_logs,
591                        maybe_block
592                            .map(ProviderOrBlock::Block)
593                            .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
594                        filter,
595                        num_hash,
596                        &receipts,
597                        false,
598                        header.timestamp(),
599                    )?;
600
601                    // size check but only if range is multiple blocks, so we always return all
602                    // logs of a single block
603                    let is_multi_block_range = from_block != to_block;
604                    if let Some(max_logs_per_response) = limits.max_logs_per_response {
605                        if is_multi_block_range && all_logs.len() > max_logs_per_response {
606                            return Err(EthFilterError::QueryExceedsMaxResults {
607                                max_logs: max_logs_per_response,
608                                from_block,
609                                to_block: num_hash.number.saturating_sub(1),
610                            });
611                        }
612                    }
613                }
614            }
615        }
616
617        Ok(all_logs)
618    }
619}
620
621/// All active filters
622#[derive(Debug, Clone, Default)]
623pub struct ActiveFilters<T> {
624    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
625}
626
627impl<T> ActiveFilters<T> {
628    /// Returns an empty instance.
629    pub fn new() -> Self {
630        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
631    }
632}
633
634/// An installed filter
635#[derive(Debug)]
636struct ActiveFilter<T> {
637    /// At which block the filter was polled last.
638    block: u64,
639    /// Last time this filter was polled.
640    last_poll_timestamp: Instant,
641    /// What kind of filter it is.
642    kind: FilterKind<T>,
643}
644
645/// A receiver for pending transactions that returns all new transactions since the last poll.
646#[derive(Debug, Clone)]
647struct PendingTransactionsReceiver {
648    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
649}
650
651impl PendingTransactionsReceiver {
652    fn new(receiver: Receiver<TxHash>) -> Self {
653        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
654    }
655
656    /// Returns all new pending transactions received since the last poll.
657    async fn drain<T>(&self) -> FilterChanges<T> {
658        let mut pending_txs = Vec::new();
659        let mut prepared_stream = self.txs_receiver.lock().await;
660
661        while let Ok(tx_hash) = prepared_stream.try_recv() {
662            pending_txs.push(tx_hash);
663        }
664
665        // Convert the vector of hashes into FilterChanges::Hashes
666        FilterChanges::Hashes(pending_txs)
667    }
668}
669
670/// A structure to manage and provide access to a stream of full transaction details.
671#[derive(Debug, Clone)]
672struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
673    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
674    tx_resp_builder: TxCompat,
675}
676
677impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
678where
679    T: PoolTransaction + 'static,
680    TxCompat: TransactionCompat<T::Consensus>,
681{
682    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
683    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
684        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
685    }
686
687    /// Returns all new pending transactions received since the last poll.
688    async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
689        let mut pending_txs = Vec::new();
690        let mut prepared_stream = self.txs_stream.lock().await;
691
692        while let Ok(tx) = prepared_stream.try_recv() {
693            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
694                Ok(tx) => pending_txs.push(tx),
695                Err(err) => {
696                    error!(target: "rpc",
697                        %err,
698                        "Failed to fill txn with block context"
699                    );
700                }
701            }
702        }
703        FilterChanges::Transactions(pending_txs)
704    }
705}
706
707/// Helper trait for [FullTransactionsReceiver] to erase the `Transaction` type.
708#[async_trait]
709trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
710    async fn drain(&self) -> FilterChanges<T>;
711}
712
713#[async_trait]
714impl<T, TxCompat> FullTransactionsFilter<TxCompat::Transaction>
715    for FullTransactionsReceiver<T, TxCompat>
716where
717    T: PoolTransaction + 'static,
718    TxCompat: TransactionCompat<T::Consensus> + 'static,
719{
720    async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
721        Self::drain(self).await
722    }
723}
724
725/// Represents the kind of pending transaction data that can be retrieved.
726///
727/// This enum differentiates between two kinds of pending transaction data:
728/// - Just the transaction hashes.
729/// - Full transaction details.
730#[derive(Debug, Clone)]
731enum PendingTransactionKind<T> {
732    Hashes(PendingTransactionsReceiver),
733    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
734}
735
736impl<T: 'static> PendingTransactionKind<T> {
737    async fn drain(&self) -> FilterChanges<T> {
738        match self {
739            Self::Hashes(receiver) => receiver.drain().await,
740            Self::FullTransaction(receiver) => receiver.drain().await,
741        }
742    }
743}
744
745#[derive(Clone, Debug)]
746enum FilterKind<T> {
747    Log(Box<Filter>),
748    Block,
749    PendingTransaction(PendingTransactionKind<T>),
750}
751
752/// An iterator that yields _inclusive_ block ranges of a given step size
753#[derive(Debug)]
754struct BlockRangeInclusiveIter {
755    iter: StepBy<RangeInclusive<u64>>,
756    step: u64,
757    end: u64,
758}
759
760impl BlockRangeInclusiveIter {
761    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
762        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
763    }
764}
765
766impl Iterator for BlockRangeInclusiveIter {
767    type Item = (u64, u64);
768
769    fn next(&mut self) -> Option<Self::Item> {
770        let start = self.iter.next()?;
771        let end = (start + self.step).min(self.end);
772        if start > end {
773            return None
774        }
775        Some((start, end))
776    }
777}
778
779/// Errors that can occur in the handler implementation
780#[derive(Debug, thiserror::Error)]
781pub enum EthFilterError {
782    /// Filter not found.
783    #[error("filter not found")]
784    FilterNotFound(FilterId),
785    /// Invalid block range.
786    #[error("invalid block range params")]
787    InvalidBlockRangeParams,
788    /// Query scope is too broad.
789    #[error("query exceeds max block range {0}")]
790    QueryExceedsMaxBlocks(u64),
791    /// Query result is too large.
792    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
793    QueryExceedsMaxResults {
794        /// Maximum number of logs allowed per response
795        max_logs: usize,
796        /// Start block of the suggested retry range
797        from_block: u64,
798        /// End block of the suggested retry range (last successfully processed block)
799        to_block: u64,
800    },
801    /// Error serving request in `eth_` namespace.
802    #[error(transparent)]
803    EthAPIError(#[from] EthApiError),
804    /// Error thrown when a spawned task failed to deliver a response.
805    #[error("internal filter error")]
806    InternalError,
807}
808
809impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
810    fn from(err: EthFilterError) -> Self {
811        match err {
812            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
813                jsonrpsee::types::error::INVALID_PARAMS_CODE,
814                "filter not found",
815            ),
816            err @ EthFilterError::InternalError => {
817                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
818            }
819            EthFilterError::EthAPIError(err) => err.into(),
820            err @ (EthFilterError::InvalidBlockRangeParams |
821            EthFilterError::QueryExceedsMaxBlocks(_) |
822            EthFilterError::QueryExceedsMaxResults { .. }) => {
823                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
824            }
825        }
826    }
827}
828
829impl From<ProviderError> for EthFilterError {
830    fn from(err: ProviderError) -> Self {
831        Self::EthAPIError(err.into())
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838    use rand::Rng;
839    use reth_testing_utils::generators;
840
841    #[test]
842    fn test_block_range_iter() {
843        let mut rng = generators::rng();
844
845        let start = rng.random::<u32>() as u64;
846        let end = start.saturating_add(rng.random::<u32>() as u64);
847        let step = rng.random::<u16>() as u64;
848        let range = start..=end;
849        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
850        let (from, mut end) = iter.next().unwrap();
851        assert_eq!(from, start);
852        assert_eq!(end, (from + step).min(*range.end()));
853
854        for (next_from, next_end) in iter {
855            // ensure range starts with previous end + 1
856            assert_eq!(next_from, end + 1);
857            end = next_end;
858        }
859
860        assert_eq!(end, *range.end());
861    }
862}