1use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7 PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::future::TryFutureExt;
11use jsonrpsee::{core::RpcResult, server::IdProvider};
12use reth_errors::ProviderError;
13use reth_rpc_eth_api::{
14 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcNodeCoreExt,
15 RpcTransaction, TransactionCompat,
16};
17use reth_rpc_eth_types::{
18 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
19 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
20};
21use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
22use reth_storage_api::{
23 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
24 ProviderReceipt,
25};
26use reth_tasks::TaskSpawner;
27use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
28use std::{
29 collections::HashMap,
30 fmt,
31 future::Future,
32 iter::StepBy,
33 ops::RangeInclusive,
34 sync::Arc,
35 time::{Duration, Instant},
36};
37use tokio::{
38 sync::{mpsc::Receiver, oneshot, Mutex},
39 time::MissedTickBehavior,
40};
41use tracing::{error, trace};
42
43impl<Eth> EngineEthFilter for EthFilter<Eth>
44where
45 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
46{
47 fn logs(
49 &self,
50 filter: Filter,
51 limits: QueryLimits,
52 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
53 trace!(target: "rpc::eth", "Serving eth_getLogs");
54 self.logs_for_filter(filter, limits).map_err(|e| e.into())
55 }
56}
57
58const MAX_HEADERS_RANGE: u64 = 1_000; pub struct EthFilter<Eth: EthApiTypes> {
65 inner: Arc<EthFilterInner<Eth>>,
67}
68
69impl<Eth> Clone for EthFilter<Eth>
70where
71 Eth: EthApiTypes,
72{
73 fn clone(&self) -> Self {
74 Self { inner: self.inner.clone() }
75 }
76}
77
78impl<Eth> EthFilter<Eth>
79where
80 Eth: EthApiTypes + 'static,
81{
82 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
110 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
111 config;
112 let inner = EthFilterInner {
113 eth_api,
114 active_filters: ActiveFilters::new(),
115 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
116 max_headers_range: MAX_HEADERS_RANGE,
117 task_spawner,
118 stale_filter_ttl,
119 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
120 };
121
122 let eth_filter = Self { inner: Arc::new(inner) };
123
124 let this = eth_filter.clone();
125 eth_filter.inner.task_spawner.spawn_critical(
126 "eth-filters_stale-filters-clean",
127 Box::pin(async move {
128 this.watch_and_clear_stale_filters().await;
129 }),
130 );
131
132 eth_filter
133 }
134
135 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
137 &self.inner.active_filters
138 }
139
140 async fn watch_and_clear_stale_filters(&self) {
143 let mut interval = tokio::time::interval_at(
144 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
145 self.inner.stale_filter_ttl,
146 );
147 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
148 loop {
149 interval.tick().await;
150 self.clear_stale_filters(Instant::now()).await;
151 }
152 }
153
154 pub async fn clear_stale_filters(&self, now: Instant) {
157 trace!(target: "rpc::eth", "clear stale filters");
158 self.active_filters().inner.lock().await.retain(|id, filter| {
159 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
160
161 if !is_valid {
162 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
163 }
164
165 is_valid
166 })
167 }
168}
169
170impl<Eth> EthFilter<Eth>
171where
172 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
173{
174 fn provider(&self) -> &Eth::Provider {
176 self.inner.eth_api.provider()
177 }
178
179 fn pool(&self) -> &Eth::Pool {
181 self.inner.eth_api.pool()
182 }
183
184 pub async fn filter_changes(
186 &self,
187 id: FilterId,
188 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
189 let info = self.provider().chain_info()?;
190 let best_number = info.best_number;
191
192 let (start_block, kind) = {
195 let mut filters = self.inner.active_filters.inner.lock().await;
196 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
197
198 if filter.block > best_number {
199 return Ok(FilterChanges::Empty)
201 }
202
203 let mut block = best_number + 1;
207 std::mem::swap(&mut filter.block, &mut block);
208 filter.last_poll_timestamp = Instant::now();
209
210 (block, filter.kind.clone())
211 };
212
213 match kind {
214 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
215 FilterKind::Block => {
216 let end_block = best_number + 1;
219 let block_hashes =
220 self.provider().canonical_hashes_range(start_block, end_block).map_err(
221 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
222 )?;
223 Ok(FilterChanges::Hashes(block_hashes))
224 }
225 FilterKind::Log(filter) => {
226 let (from_block_number, to_block_number) = match filter.block_option {
227 FilterBlockOption::Range { from_block, to_block } => {
228 let from = from_block
229 .map(|num| self.provider().convert_block_number(num))
230 .transpose()?
231 .flatten();
232 let to = to_block
233 .map(|num| self.provider().convert_block_number(num))
234 .transpose()?
235 .flatten();
236 logs_utils::get_filter_block_range(from, to, start_block, info)
237 }
238 FilterBlockOption::AtBlockHash(_) => {
239 (start_block, best_number)
243 }
244 };
245 let logs = self
246 .inner
247 .clone()
248 .get_logs_in_block_range(
249 *filter,
250 from_block_number,
251 to_block_number,
252 self.inner.query_limits,
253 )
254 .await?;
255 Ok(FilterChanges::Logs(logs))
256 }
257 }
258 }
259
260 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
266 let filter = {
267 let filters = self.inner.active_filters.inner.lock().await;
268 if let FilterKind::Log(ref filter) =
269 filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
270 {
271 *filter.clone()
272 } else {
273 return Err(EthFilterError::FilterNotFound(id))
275 }
276 };
277
278 self.logs_for_filter(filter, self.inner.query_limits).await
279 }
280
281 async fn logs_for_filter(
283 &self,
284 filter: Filter,
285 limits: QueryLimits,
286 ) -> Result<Vec<Log>, EthFilterError> {
287 self.inner.clone().logs_for_filter(filter, limits).await
288 }
289}
290
291#[async_trait]
292impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
293where
294 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
295{
296 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
298 trace!(target: "rpc::eth", "Serving eth_newFilter");
299 self.inner
300 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
301 .await
302 }
303
304 async fn new_block_filter(&self) -> RpcResult<FilterId> {
306 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
307 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
308 }
309
310 async fn new_pending_transaction_filter(
312 &self,
313 kind: Option<PendingTransactionFilterKind>,
314 ) -> RpcResult<FilterId> {
315 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
316
317 let transaction_kind = match kind.unwrap_or_default() {
318 PendingTransactionFilterKind::Hashes => {
319 let receiver = self.pool().pending_transactions_listener();
320 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
321 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
322 }
323 PendingTransactionFilterKind::Full => {
324 let stream = self.pool().new_pending_pool_transactions_listener();
325 let full_txs_receiver = FullTransactionsReceiver::new(
326 stream,
327 self.inner.eth_api.tx_resp_builder().clone(),
328 );
329 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
330 full_txs_receiver,
331 )))
332 }
333 };
334
335 self.inner.install_filter(transaction_kind).await
339 }
340
341 async fn filter_changes(
343 &self,
344 id: FilterId,
345 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
346 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
347 Ok(Self::filter_changes(self, id).await?)
348 }
349
350 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
356 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
357 Ok(Self::filter_logs(self, id).await?)
358 }
359
360 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
362 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
363 let mut filters = self.inner.active_filters.inner.lock().await;
364 if filters.remove(&id).is_some() {
365 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
366 Ok(true)
367 } else {
368 Ok(false)
369 }
370 }
371
372 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
376 trace!(target: "rpc::eth", "Serving eth_getLogs");
377 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
378 }
379}
380
381impl<Eth> std::fmt::Debug for EthFilter<Eth>
382where
383 Eth: EthApiTypes,
384{
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 f.debug_struct("EthFilter").finish_non_exhaustive()
387 }
388}
389
390#[derive(Debug)]
392struct EthFilterInner<Eth: EthApiTypes> {
393 eth_api: Eth,
395 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
397 id_provider: Arc<dyn IdProvider>,
399 query_limits: QueryLimits,
401 max_headers_range: u64,
403 task_spawner: Box<dyn TaskSpawner>,
405 stale_filter_ttl: Duration,
407}
408
409impl<Eth> EthFilterInner<Eth>
410where
411 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
412{
413 fn provider(&self) -> &Eth::Provider {
415 self.eth_api.provider()
416 }
417
418 fn eth_cache(
420 &self,
421 ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
422 self.eth_api.cache()
423 }
424
425 async fn logs_for_filter(
427 self: Arc<Self>,
428 filter: Filter,
429 limits: QueryLimits,
430 ) -> Result<Vec<Log>, EthFilterError> {
431 match filter.block_option {
432 FilterBlockOption::AtBlockHash(block_hash) => {
433 let header = self
436 .provider()
437 .header_by_hash_or_number(block_hash.into())?
438 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
439
440 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
441
442 let (receipts, maybe_block) = self
445 .eth_cache()
446 .get_receipts_and_maybe_block(block_num_hash.hash)
447 .await?
448 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
449
450 let mut all_logs = Vec::new();
451 append_matching_block_logs(
452 &mut all_logs,
453 maybe_block
454 .map(ProviderOrBlock::Block)
455 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
456 &filter,
457 block_num_hash,
458 &receipts,
459 false,
460 header.timestamp(),
461 )?;
462
463 Ok(all_logs)
464 }
465 FilterBlockOption::Range { from_block, to_block } => {
466 let info = self.provider().chain_info()?;
468
469 let start_block = info.best_number;
471 let from = from_block
472 .map(|num| self.provider().convert_block_number(num))
473 .transpose()?
474 .flatten();
475 let to = to_block
476 .map(|num| self.provider().convert_block_number(num))
477 .transpose()?
478 .flatten();
479 let (from_block_number, to_block_number) =
480 logs_utils::get_filter_block_range(from, to, start_block, info);
481 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
482 .await
483 }
484 }
485 }
486
487 async fn install_filter(
489 &self,
490 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
491 ) -> RpcResult<FilterId> {
492 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
493 let subscription_id = self.id_provider.next_id();
494
495 let id = match subscription_id {
496 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
497 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
498 };
499 let mut filters = self.active_filters.inner.lock().await;
500 filters.insert(
501 id.clone(),
502 ActiveFilter {
503 block: last_poll_block_number,
504 last_poll_timestamp: Instant::now(),
505 kind,
506 },
507 );
508 Ok(id)
509 }
510
511 async fn get_logs_in_block_range(
517 self: Arc<Self>,
518 filter: Filter,
519 from_block: u64,
520 to_block: u64,
521 limits: QueryLimits,
522 ) -> Result<Vec<Log>, EthFilterError> {
523 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
524
525 if to_block < from_block {
527 return Err(EthFilterError::InvalidBlockRangeParams)
528 }
529
530 if let Some(max_blocks_per_filter) =
531 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
532 {
533 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
534 }
535
536 let (tx, rx) = oneshot::channel();
537 let this = self.clone();
538 self.task_spawner.spawn_blocking(Box::pin(async move {
539 let res =
540 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
541 let _ = tx.send(res);
542 }));
543
544 rx.await.map_err(|_| EthFilterError::InternalError)?
545 }
546
547 async fn get_logs_in_block_range_inner(
556 &self,
557 filter: &Filter,
558 from_block: u64,
559 to_block: u64,
560 limits: QueryLimits,
561 ) -> Result<Vec<Log>, EthFilterError> {
562 let mut all_logs = Vec::new();
563
564 for (from, to) in
567 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
568 {
569 let headers = self.provider().headers_range(from..=to)?;
570 for (idx, header) in headers
571 .iter()
572 .enumerate()
573 .filter(|(_, header)| filter.matches_bloom(header.logs_bloom()))
574 {
575 let block_hash = match headers.get(idx + 1) {
578 Some(child) => child.parent_hash(),
579 None => self
580 .provider()
581 .block_hash(header.number())?
582 .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
583 };
584
585 let num_hash = BlockNumHash::new(header.number(), block_hash);
586 if let Some((receipts, maybe_block)) =
587 self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
588 {
589 append_matching_block_logs(
590 &mut all_logs,
591 maybe_block
592 .map(ProviderOrBlock::Block)
593 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
594 filter,
595 num_hash,
596 &receipts,
597 false,
598 header.timestamp(),
599 )?;
600
601 let is_multi_block_range = from_block != to_block;
604 if let Some(max_logs_per_response) = limits.max_logs_per_response {
605 if is_multi_block_range && all_logs.len() > max_logs_per_response {
606 return Err(EthFilterError::QueryExceedsMaxResults {
607 max_logs: max_logs_per_response,
608 from_block,
609 to_block: num_hash.number.saturating_sub(1),
610 });
611 }
612 }
613 }
614 }
615 }
616
617 Ok(all_logs)
618 }
619}
620
621#[derive(Debug, Clone, Default)]
623pub struct ActiveFilters<T> {
624 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
625}
626
627impl<T> ActiveFilters<T> {
628 pub fn new() -> Self {
630 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
631 }
632}
633
634#[derive(Debug)]
636struct ActiveFilter<T> {
637 block: u64,
639 last_poll_timestamp: Instant,
641 kind: FilterKind<T>,
643}
644
645#[derive(Debug, Clone)]
647struct PendingTransactionsReceiver {
648 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
649}
650
651impl PendingTransactionsReceiver {
652 fn new(receiver: Receiver<TxHash>) -> Self {
653 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
654 }
655
656 async fn drain<T>(&self) -> FilterChanges<T> {
658 let mut pending_txs = Vec::new();
659 let mut prepared_stream = self.txs_receiver.lock().await;
660
661 while let Ok(tx_hash) = prepared_stream.try_recv() {
662 pending_txs.push(tx_hash);
663 }
664
665 FilterChanges::Hashes(pending_txs)
667 }
668}
669
670#[derive(Debug, Clone)]
672struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
673 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
674 tx_resp_builder: TxCompat,
675}
676
677impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
678where
679 T: PoolTransaction + 'static,
680 TxCompat: TransactionCompat<T::Consensus>,
681{
682 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
684 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
685 }
686
687 async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
689 let mut pending_txs = Vec::new();
690 let mut prepared_stream = self.txs_stream.lock().await;
691
692 while let Ok(tx) = prepared_stream.try_recv() {
693 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
694 Ok(tx) => pending_txs.push(tx),
695 Err(err) => {
696 error!(target: "rpc",
697 %err,
698 "Failed to fill txn with block context"
699 );
700 }
701 }
702 }
703 FilterChanges::Transactions(pending_txs)
704 }
705}
706
707#[async_trait]
709trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
710 async fn drain(&self) -> FilterChanges<T>;
711}
712
713#[async_trait]
714impl<T, TxCompat> FullTransactionsFilter<TxCompat::Transaction>
715 for FullTransactionsReceiver<T, TxCompat>
716where
717 T: PoolTransaction + 'static,
718 TxCompat: TransactionCompat<T::Consensus> + 'static,
719{
720 async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
721 Self::drain(self).await
722 }
723}
724
725#[derive(Debug, Clone)]
731enum PendingTransactionKind<T> {
732 Hashes(PendingTransactionsReceiver),
733 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
734}
735
736impl<T: 'static> PendingTransactionKind<T> {
737 async fn drain(&self) -> FilterChanges<T> {
738 match self {
739 Self::Hashes(receiver) => receiver.drain().await,
740 Self::FullTransaction(receiver) => receiver.drain().await,
741 }
742 }
743}
744
745#[derive(Clone, Debug)]
746enum FilterKind<T> {
747 Log(Box<Filter>),
748 Block,
749 PendingTransaction(PendingTransactionKind<T>),
750}
751
752#[derive(Debug)]
754struct BlockRangeInclusiveIter {
755 iter: StepBy<RangeInclusive<u64>>,
756 step: u64,
757 end: u64,
758}
759
760impl BlockRangeInclusiveIter {
761 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
762 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
763 }
764}
765
766impl Iterator for BlockRangeInclusiveIter {
767 type Item = (u64, u64);
768
769 fn next(&mut self) -> Option<Self::Item> {
770 let start = self.iter.next()?;
771 let end = (start + self.step).min(self.end);
772 if start > end {
773 return None
774 }
775 Some((start, end))
776 }
777}
778
779#[derive(Debug, thiserror::Error)]
781pub enum EthFilterError {
782 #[error("filter not found")]
784 FilterNotFound(FilterId),
785 #[error("invalid block range params")]
787 InvalidBlockRangeParams,
788 #[error("query exceeds max block range {0}")]
790 QueryExceedsMaxBlocks(u64),
791 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
793 QueryExceedsMaxResults {
794 max_logs: usize,
796 from_block: u64,
798 to_block: u64,
800 },
801 #[error(transparent)]
803 EthAPIError(#[from] EthApiError),
804 #[error("internal filter error")]
806 InternalError,
807}
808
809impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
810 fn from(err: EthFilterError) -> Self {
811 match err {
812 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
813 jsonrpsee::types::error::INVALID_PARAMS_CODE,
814 "filter not found",
815 ),
816 err @ EthFilterError::InternalError => {
817 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
818 }
819 EthFilterError::EthAPIError(err) => err.into(),
820 err @ (EthFilterError::InvalidBlockRangeParams |
821 EthFilterError::QueryExceedsMaxBlocks(_) |
822 EthFilterError::QueryExceedsMaxResults { .. }) => {
823 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
824 }
825 }
826 }
827}
828
829impl From<ProviderError> for EthFilterError {
830 fn from(err: ProviderError) -> Self {
831 Self::EthAPIError(err.into())
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838 use rand::Rng;
839 use reth_testing_utils::generators;
840
841 #[test]
842 fn test_block_range_iter() {
843 let mut rng = generators::rng();
844
845 let start = rng.random::<u32>() as u64;
846 let end = start.saturating_add(rng.random::<u32>() as u64);
847 let step = rng.random::<u16>() as u64;
848 let range = start..=end;
849 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
850 let (from, mut end) = iter.next().unwrap();
851 assert_eq!(from, start);
852 assert_eq!(end, (from + step).min(*range.end()));
853
854 for (next_from, next_end) in iter {
855 assert_eq!(next_from, end + 1);
857 end = next_end;
858 }
859
860 assert_eq!(end, *range.end());
861 }
862}