reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use jsonrpsee::{core::RpcResult, server::IdProvider};
11use reth_chainspec::ChainInfo;
12use reth_primitives::SealedBlockWithSenders;
13use reth_provider::{
14    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
15    ProviderError, ProviderReceipt,
16};
17use reth_rpc_eth_api::{
18    EthApiTypes, EthFilterApiServer, FullEthApiTypes, RpcNodeCoreExt, RpcTransaction,
19    TransactionCompat,
20};
21use reth_rpc_eth_types::{
22    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
23    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
24};
25use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
26use reth_rpc_types_compat::transaction::from_recovered;
27use reth_tasks::TaskSpawner;
28use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
29use std::{
30    collections::HashMap,
31    fmt,
32    iter::StepBy,
33    ops::RangeInclusive,
34    sync::Arc,
35    time::{Duration, Instant},
36};
37use tokio::{
38    sync::{mpsc::Receiver, Mutex},
39    time::MissedTickBehavior,
40};
41use tracing::{error, trace};
42
43/// The maximum number of headers we read at once when handling a range filter.
44const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
45
46/// `Eth` filter RPC implementation.
47pub struct EthFilter<Eth: EthApiTypes> {
48    /// All nested fields bundled together
49    inner: Arc<EthFilterInner<Eth>>,
50}
51
52impl<Eth> Clone for EthFilter<Eth>
53where
54    Eth: EthApiTypes,
55{
56    fn clone(&self) -> Self {
57        Self { inner: self.inner.clone() }
58    }
59}
60
61impl<Eth> EthFilter<Eth>
62where
63    Eth: EthApiTypes + 'static,
64{
65    /// Creates a new, shareable instance.
66    ///
67    /// This uses the given pool to get notified about new transactions, the provider to interact
68    /// with the blockchain, the cache to fetch cacheable data, like the logs.
69    ///
70    /// See also [`EthFilterConfig`].
71    ///
72    /// This also spawns a task that periodically clears stale filters.
73    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
74        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
75            config;
76        let inner = EthFilterInner {
77            eth_api,
78            active_filters: ActiveFilters::new(),
79            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
80            max_headers_range: MAX_HEADERS_RANGE,
81            task_spawner,
82            stale_filter_ttl,
83            // if not set, use the max value, which is effectively no limit
84            max_blocks_per_filter: max_blocks_per_filter.unwrap_or(u64::MAX),
85            max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX),
86        };
87
88        let eth_filter = Self { inner: Arc::new(inner) };
89
90        let this = eth_filter.clone();
91        eth_filter.inner.task_spawner.spawn_critical(
92            "eth-filters_stale-filters-clean",
93            Box::pin(async move {
94                this.watch_and_clear_stale_filters().await;
95            }),
96        );
97
98        eth_filter
99    }
100
101    /// Returns all currently active filters
102    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
103        &self.inner.active_filters
104    }
105
106    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
107    /// Nonetheless, this endless future frees the thread at every await point.
108    async fn watch_and_clear_stale_filters(&self) {
109        let mut interval = tokio::time::interval_at(
110            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
111            self.inner.stale_filter_ttl,
112        );
113        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
114        loop {
115            interval.tick().await;
116            self.clear_stale_filters(Instant::now()).await;
117        }
118    }
119
120    /// Clears all filters that have not been polled for longer than the configured
121    /// `stale_filter_ttl` at the given instant.
122    pub async fn clear_stale_filters(&self, now: Instant) {
123        trace!(target: "rpc::eth", "clear stale filters");
124        self.active_filters().inner.lock().await.retain(|id, filter| {
125            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
126
127            if !is_valid {
128                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
129            }
130
131            is_valid
132        })
133    }
134}
135
136impl<Eth> EthFilter<Eth>
137where
138    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
139{
140    /// Access the underlying provider.
141    fn provider(&self) -> &Eth::Provider {
142        self.inner.eth_api.provider()
143    }
144
145    /// Access the underlying pool.
146    fn pool(&self) -> &Eth::Pool {
147        self.inner.eth_api.pool()
148    }
149
150    /// Returns all the filter changes for the given id, if any
151    pub async fn filter_changes(
152        &self,
153        id: FilterId,
154    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
155        let info = self.provider().chain_info()?;
156        let best_number = info.best_number;
157
158        // start_block is the block from which we should start fetching changes, the next block from
159        // the last time changes were polled, in other words the best block at last poll + 1
160        let (start_block, kind) = {
161            let mut filters = self.inner.active_filters.inner.lock().await;
162            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
163
164            if filter.block > best_number {
165                // no new blocks since the last poll
166                return Ok(FilterChanges::Empty)
167            }
168
169            // update filter
170            // we fetch all changes from [filter.block..best_block], so we advance the filter's
171            // block to `best_block +1`, the next from which we should start fetching changes again
172            let mut block = best_number + 1;
173            std::mem::swap(&mut filter.block, &mut block);
174            filter.last_poll_timestamp = Instant::now();
175
176            (block, filter.kind.clone())
177        };
178
179        match kind {
180            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
181            FilterKind::Block => {
182                // Note: we need to fetch the block hashes from inclusive range
183                // [start_block..best_block]
184                let end_block = best_number + 1;
185                let block_hashes =
186                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
187                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
188                    )?;
189                Ok(FilterChanges::Hashes(block_hashes))
190            }
191            FilterKind::Log(filter) => {
192                let (from_block_number, to_block_number) = match filter.block_option {
193                    FilterBlockOption::Range { from_block, to_block } => {
194                        let from = from_block
195                            .map(|num| self.provider().convert_block_number(num))
196                            .transpose()?
197                            .flatten();
198                        let to = to_block
199                            .map(|num| self.provider().convert_block_number(num))
200                            .transpose()?
201                            .flatten();
202                        logs_utils::get_filter_block_range(from, to, start_block, info)
203                    }
204                    FilterBlockOption::AtBlockHash(_) => {
205                        // blockHash is equivalent to fromBlock = toBlock = the block number with
206                        // hash blockHash
207                        // get_logs_in_block_range is inclusive
208                        (start_block, best_number)
209                    }
210                };
211                let logs = self
212                    .inner
213                    .get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
214                    .await?;
215                Ok(FilterChanges::Logs(logs))
216            }
217        }
218    }
219
220    /// Returns an array of all logs matching filter with given id.
221    ///
222    /// Returns an error if no matching log filter exists.
223    ///
224    /// Handler for `eth_getFilterLogs`
225    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
226        let filter = {
227            let filters = self.inner.active_filters.inner.lock().await;
228            if let FilterKind::Log(ref filter) =
229                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
230            {
231                *filter.clone()
232            } else {
233                // Not a log filter
234                return Err(EthFilterError::FilterNotFound(id))
235            }
236        };
237
238        self.inner.logs_for_filter(filter).await
239    }
240}
241
242#[async_trait]
243impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
244where
245    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
246{
247    /// Handler for `eth_newFilter`
248    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
249        trace!(target: "rpc::eth", "Serving eth_newFilter");
250        self.inner
251            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
252            .await
253    }
254
255    /// Handler for `eth_newBlockFilter`
256    async fn new_block_filter(&self) -> RpcResult<FilterId> {
257        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
258        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
259    }
260
261    /// Handler for `eth_newPendingTransactionFilter`
262    async fn new_pending_transaction_filter(
263        &self,
264        kind: Option<PendingTransactionFilterKind>,
265    ) -> RpcResult<FilterId> {
266        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
267
268        let transaction_kind = match kind.unwrap_or_default() {
269            PendingTransactionFilterKind::Hashes => {
270                let receiver = self.pool().pending_transactions_listener();
271                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
272                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
273            }
274            PendingTransactionFilterKind::Full => {
275                let stream = self.pool().new_pending_pool_transactions_listener();
276                let full_txs_receiver = FullTransactionsReceiver::new(
277                    stream,
278                    self.inner.eth_api.tx_resp_builder().clone(),
279                );
280                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
281                    full_txs_receiver,
282                )))
283            }
284        };
285
286        //let filter = FilterKind::PendingTransaction(transaction_kind);
287
288        // Install the filter and propagate any errors
289        self.inner.install_filter(transaction_kind).await
290    }
291
292    /// Handler for `eth_getFilterChanges`
293    async fn filter_changes(
294        &self,
295        id: FilterId,
296    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
297        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
298        Ok(Self::filter_changes(self, id).await?)
299    }
300
301    /// Returns an array of all logs matching filter with given id.
302    ///
303    /// Returns an error if no matching log filter exists.
304    ///
305    /// Handler for `eth_getFilterLogs`
306    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
307        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
308        Ok(Self::filter_logs(self, id).await?)
309    }
310
311    /// Handler for `eth_uninstallFilter`
312    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
313        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
314        let mut filters = self.inner.active_filters.inner.lock().await;
315        if filters.remove(&id).is_some() {
316            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
317            Ok(true)
318        } else {
319            Ok(false)
320        }
321    }
322
323    /// Returns logs matching given filter object.
324    ///
325    /// Handler for `eth_getLogs`
326    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
327        trace!(target: "rpc::eth", "Serving eth_getLogs");
328        Ok(self.inner.logs_for_filter(filter).await?)
329    }
330}
331
332impl<Eth> std::fmt::Debug for EthFilter<Eth>
333where
334    Eth: EthApiTypes,
335{
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        f.debug_struct("EthFilter").finish_non_exhaustive()
338    }
339}
340
341/// Container type `EthFilter`
342#[derive(Debug)]
343struct EthFilterInner<Eth: EthApiTypes> {
344    /// Inner `eth` API implementation.
345    eth_api: Eth,
346    /// All currently installed filters.
347    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
348    /// Provides ids to identify filters
349    id_provider: Arc<dyn IdProvider>,
350    /// Maximum number of blocks that could be scanned per filter
351    max_blocks_per_filter: u64,
352    /// Maximum number of logs that can be returned in a response
353    max_logs_per_response: usize,
354    /// maximum number of headers to read at once for range filter
355    max_headers_range: u64,
356    /// The type that can spawn tasks.
357    task_spawner: Box<dyn TaskSpawner>,
358    /// Duration since the last filter poll, after which the filter is considered stale
359    stale_filter_ttl: Duration,
360}
361
362impl<Eth> EthFilterInner<Eth>
363where
364    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
365{
366    /// Access the underlying provider.
367    fn provider(&self) -> &Eth::Provider {
368        self.eth_api.provider()
369    }
370
371    /// Access the underlying [`EthStateCache`].
372    fn eth_cache(
373        &self,
374    ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
375        self.eth_api.cache()
376    }
377
378    /// Returns logs matching given filter object.
379    async fn logs_for_filter(&self, filter: Filter) -> Result<Vec<Log>, EthFilterError> {
380        match filter.block_option {
381            FilterBlockOption::AtBlockHash(block_hash) => {
382                // for all matching logs in the block
383                // get the block header with the hash
384                let header = self
385                    .provider()
386                    .header_by_hash_or_number(block_hash.into())?
387                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
388
389                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
390
391                // we also need to ensure that the receipts are available and return an error if
392                // not, in case the block hash been reorged
393                let (receipts, maybe_block) = self
394                    .receipts_and_maybe_block(
395                        &block_num_hash,
396                        self.provider().chain_info()?.best_number,
397                    )
398                    .await?
399                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
400
401                let mut all_logs = Vec::new();
402                append_matching_block_logs(
403                    &mut all_logs,
404                    maybe_block
405                        .map(ProviderOrBlock::Block)
406                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
407                    &FilteredParams::new(Some(filter)),
408                    block_num_hash,
409                    &receipts,
410                    false,
411                    header.timestamp(),
412                )?;
413
414                Ok(all_logs)
415            }
416            FilterBlockOption::Range { from_block, to_block } => {
417                // compute the range
418                let info = self.provider().chain_info()?;
419
420                // we start at the most recent block if unset in filter
421                let start_block = info.best_number;
422                let from = from_block
423                    .map(|num| self.provider().convert_block_number(num))
424                    .transpose()?
425                    .flatten();
426                let to = to_block
427                    .map(|num| self.provider().convert_block_number(num))
428                    .transpose()?
429                    .flatten();
430                let (from_block_number, to_block_number) =
431                    logs_utils::get_filter_block_range(from, to, start_block, info);
432                self.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
433                    .await
434            }
435        }
436    }
437
438    /// Installs a new filter and returns the new identifier.
439    async fn install_filter(
440        &self,
441        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
442    ) -> RpcResult<FilterId> {
443        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
444        let id = FilterId::from(self.id_provider.next_id());
445        let mut filters = self.active_filters.inner.lock().await;
446        filters.insert(
447            id.clone(),
448            ActiveFilter {
449                block: last_poll_block_number,
450                last_poll_timestamp: Instant::now(),
451                kind,
452            },
453        );
454        Ok(id)
455    }
456
457    /// Returns all logs in the given _inclusive_ range that match the filter
458    ///
459    /// Returns an error if:
460    ///  - underlying database error
461    ///  - amount of matches exceeds configured limit
462    async fn get_logs_in_block_range(
463        &self,
464        filter: &Filter,
465        from_block: u64,
466        to_block: u64,
467        chain_info: ChainInfo,
468    ) -> Result<Vec<Log>, EthFilterError> {
469        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
470
471        if to_block < from_block {
472            return Err(EthFilterError::InvalidBlockRangeParams)
473        }
474
475        if to_block - from_block > self.max_blocks_per_filter {
476            return Err(EthFilterError::QueryExceedsMaxBlocks(self.max_blocks_per_filter))
477        }
478
479        let mut all_logs = Vec::new();
480        let filter_params = FilteredParams::new(Some(filter.clone()));
481
482        // derive bloom filters from filter input, so we can check headers for matching logs
483        let address_filter = FilteredParams::address_filter(&filter.address);
484        let topics_filter = FilteredParams::topics_filter(&filter.topics);
485
486        // loop over the range of new blocks and check logs if the filter matches the log's bloom
487        // filter
488        for (from, to) in
489            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
490        {
491            let headers = self.provider().headers_range(from..=to)?;
492
493            for (idx, header) in headers.iter().enumerate() {
494                // only if filter matches
495                if FilteredParams::matches_address(header.logs_bloom(), &address_filter) &&
496                    FilteredParams::matches_topics(header.logs_bloom(), &topics_filter)
497                {
498                    // these are consecutive headers, so we can use the parent hash of the next
499                    // block to get the current header's hash
500                    let block_hash = match headers.get(idx + 1) {
501                        Some(parent) => parent.parent_hash(),
502                        None => self
503                            .provider()
504                            .block_hash(header.number())?
505                            .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
506                    };
507
508                    let num_hash = BlockNumHash::new(header.number(), block_hash);
509                    if let Some((receipts, maybe_block)) =
510                        self.receipts_and_maybe_block(&num_hash, chain_info.best_number).await?
511                    {
512                        append_matching_block_logs(
513                            &mut all_logs,
514                            maybe_block
515                                .map(ProviderOrBlock::Block)
516                                .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
517                            &filter_params,
518                            num_hash,
519                            &receipts,
520                            false,
521                            header.timestamp(),
522                        )?;
523
524                        // size check but only if range is multiple blocks, so we always return all
525                        // logs of a single block
526                        let is_multi_block_range = from_block != to_block;
527                        if is_multi_block_range && all_logs.len() > self.max_logs_per_response {
528                            return Err(EthFilterError::QueryExceedsMaxResults {
529                                max_logs: self.max_logs_per_response,
530                                from_block,
531                                to_block: num_hash.number.saturating_sub(1),
532                            });
533                        }
534                    }
535                }
536            }
537        }
538
539        Ok(all_logs)
540    }
541
542    /// Retrieves receipts and block from cache if near the tip (4 blocks), otherwise only receipts.
543    async fn receipts_and_maybe_block(
544        &self,
545        block_num_hash: &BlockNumHash,
546        best_number: u64,
547    ) -> Result<
548        Option<(
549            Arc<Vec<ProviderReceipt<Eth::Provider>>>,
550            Option<Arc<SealedBlockWithSenders<ProviderBlock<Eth::Provider>>>>,
551        )>,
552        EthFilterError,
553    > {
554        // The last 4 blocks are most likely cached, so we can just fetch them
555        let cached_range = best_number.saturating_sub(4)..=best_number;
556        let receipts_block = if cached_range.contains(&block_num_hash.number) {
557            self.eth_cache()
558                .get_block_and_receipts(block_num_hash.hash)
559                .await?
560                .map(|(b, r)| (r, Some(b)))
561        } else {
562            self.eth_cache().get_receipts(block_num_hash.hash).await?.map(|r| (r, None))
563        };
564        Ok(receipts_block)
565    }
566}
567
568/// All active filters
569#[derive(Debug, Clone, Default)]
570pub struct ActiveFilters<T> {
571    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
572}
573
574impl<T> ActiveFilters<T> {
575    /// Returns an empty instance.
576    pub fn new() -> Self {
577        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
578    }
579}
580
581/// An installed filter
582#[derive(Debug)]
583struct ActiveFilter<T> {
584    /// At which block the filter was polled last.
585    block: u64,
586    /// Last time this filter was polled.
587    last_poll_timestamp: Instant,
588    /// What kind of filter it is.
589    kind: FilterKind<T>,
590}
591
592/// A receiver for pending transactions that returns all new transactions since the last poll.
593#[derive(Debug, Clone)]
594struct PendingTransactionsReceiver {
595    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
596}
597
598impl PendingTransactionsReceiver {
599    fn new(receiver: Receiver<TxHash>) -> Self {
600        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
601    }
602
603    /// Returns all new pending transactions received since the last poll.
604    async fn drain<T>(&self) -> FilterChanges<T> {
605        let mut pending_txs = Vec::new();
606        let mut prepared_stream = self.txs_receiver.lock().await;
607
608        while let Ok(tx_hash) = prepared_stream.try_recv() {
609            pending_txs.push(tx_hash);
610        }
611
612        // Convert the vector of hashes into FilterChanges::Hashes
613        FilterChanges::Hashes(pending_txs)
614    }
615}
616
617/// A structure to manage and provide access to a stream of full transaction details.
618#[derive(Debug, Clone)]
619struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
620    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
621    tx_resp_builder: TxCompat,
622}
623
624impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
625where
626    T: PoolTransaction + 'static,
627    TxCompat: TransactionCompat<T::Consensus>,
628{
629    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
630    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
631        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
632    }
633
634    /// Returns all new pending transactions received since the last poll.
635    async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
636        let mut pending_txs = Vec::new();
637        let mut prepared_stream = self.txs_stream.lock().await;
638
639        while let Ok(tx) = prepared_stream.try_recv() {
640            match from_recovered(tx.transaction.to_consensus(), &self.tx_resp_builder) {
641                Ok(tx) => pending_txs.push(tx),
642                Err(err) => {
643                    error!(target: "rpc",
644                        %err,
645                        "Failed to fill txn with block context"
646                    );
647                }
648            }
649        }
650        FilterChanges::Transactions(pending_txs)
651    }
652}
653
654/// Helper trait for [FullTransactionsReceiver] to erase the `Transaction` type.
655#[async_trait]
656trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
657    async fn drain(&self) -> FilterChanges<T>;
658}
659
660#[async_trait]
661impl<T, TxCompat> FullTransactionsFilter<TxCompat::Transaction>
662    for FullTransactionsReceiver<T, TxCompat>
663where
664    T: PoolTransaction + 'static,
665    TxCompat: TransactionCompat<T::Consensus> + 'static,
666{
667    async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
668        Self::drain(self).await
669    }
670}
671
672/// Represents the kind of pending transaction data that can be retrieved.
673///
674/// This enum differentiates between two kinds of pending transaction data:
675/// - Just the transaction hashes.
676/// - Full transaction details.
677#[derive(Debug, Clone)]
678enum PendingTransactionKind<T> {
679    Hashes(PendingTransactionsReceiver),
680    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
681}
682
683impl<T: 'static> PendingTransactionKind<T> {
684    async fn drain(&self) -> FilterChanges<T> {
685        match self {
686            Self::Hashes(receiver) => receiver.drain().await,
687            Self::FullTransaction(receiver) => receiver.drain().await,
688        }
689    }
690}
691
692#[derive(Clone, Debug)]
693enum FilterKind<T> {
694    Log(Box<Filter>),
695    Block,
696    PendingTransaction(PendingTransactionKind<T>),
697}
698
699/// An iterator that yields _inclusive_ block ranges of a given step size
700#[derive(Debug)]
701struct BlockRangeInclusiveIter {
702    iter: StepBy<RangeInclusive<u64>>,
703    step: u64,
704    end: u64,
705}
706
707impl BlockRangeInclusiveIter {
708    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
709        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
710    }
711}
712
713impl Iterator for BlockRangeInclusiveIter {
714    type Item = (u64, u64);
715
716    fn next(&mut self) -> Option<Self::Item> {
717        let start = self.iter.next()?;
718        let end = (start + self.step).min(self.end);
719        if start > end {
720            return None
721        }
722        Some((start, end))
723    }
724}
725
726/// Errors that can occur in the handler implementation
727#[derive(Debug, thiserror::Error)]
728pub enum EthFilterError {
729    /// Filter not found.
730    #[error("filter not found")]
731    FilterNotFound(FilterId),
732    /// Invalid block range.
733    #[error("invalid block range params")]
734    InvalidBlockRangeParams,
735    /// Query scope is too broad.
736    #[error("query exceeds max block range {0}")]
737    QueryExceedsMaxBlocks(u64),
738    /// Query result is too large.
739    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
740    QueryExceedsMaxResults {
741        /// Maximum number of logs allowed per response
742        max_logs: usize,
743        /// Start block of the suggested retry range
744        from_block: u64,
745        /// End block of the suggested retry range (last successfully processed block)
746        to_block: u64,
747    },
748    /// Error serving request in `eth_` namespace.
749    #[error(transparent)]
750    EthAPIError(#[from] EthApiError),
751    /// Error thrown when a spawned task failed to deliver a response.
752    #[error("internal filter error")]
753    InternalError,
754}
755
756impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
757    fn from(err: EthFilterError) -> Self {
758        match err {
759            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
760                jsonrpsee::types::error::INVALID_PARAMS_CODE,
761                "filter not found",
762            ),
763            err @ EthFilterError::InternalError => {
764                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
765            }
766            EthFilterError::EthAPIError(err) => err.into(),
767            err @ (EthFilterError::InvalidBlockRangeParams |
768            EthFilterError::QueryExceedsMaxBlocks(_) |
769            EthFilterError::QueryExceedsMaxResults { .. }) => {
770                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
771            }
772        }
773    }
774}
775
776impl From<ProviderError> for EthFilterError {
777    fn from(err: ProviderError) -> Self {
778        Self::EthAPIError(err.into())
779    }
780}
781
782#[cfg(test)]
783mod tests {
784    use super::*;
785    use rand::Rng;
786    use reth_testing_utils::generators;
787
788    #[test]
789    fn test_block_range_iter() {
790        let mut rng = generators::rng();
791
792        let start = rng.gen::<u32>() as u64;
793        let end = start.saturating_add(rng.gen::<u32>() as u64);
794        let step = rng.gen::<u16>() as u64;
795        let range = start..=end;
796        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
797        let (from, mut end) = iter.next().unwrap();
798        assert_eq!(from, start);
799        assert_eq!(end, (from + step).min(*range.end()));
800
801        for (next_from, next_end) in iter {
802            // ensure range starts with previous end + 1
803            assert_eq!(next_from, end + 1);
804            end = next_end;
805        }
806
807        assert_eq!(end, *range.end());
808    }
809}