reth_network/
trusted_peers_resolver.rs1use futures::{future::BoxFuture, ready, stream::FuturesUnordered, FutureExt, StreamExt};
4use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
5use std::{
6 io,
7 task::{Context, Poll},
8};
9use tokio::time::Interval;
10use tracing::warn;
11
12#[derive(Debug)]
15pub struct TrustedPeersResolver {
16 pub trusted_peers: Vec<TrustedPeer>,
18 pub interval: Interval,
20 pub pending: FuturesUnordered<BoxFuture<'static, (PeerId, Result<NodeRecord, io::Error>)>>,
22}
23
24impl TrustedPeersResolver {
25 pub fn new(trusted_peers: Vec<TrustedPeer>, resolve_interval: Interval) -> Self {
27 Self { trusted_peers, interval: resolve_interval, pending: FuturesUnordered::new() }
28 }
29
30 #[allow(dead_code)]
32 pub fn set_interval(&mut self, interval: Interval) {
33 self.interval = interval;
34 }
35
36 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(PeerId, NodeRecord)> {
40 if self.trusted_peers.is_empty() {
41 return Poll::Pending;
42 }
43
44 if self.interval.poll_tick(cx).is_ready() {
45 self.pending.clear();
46
47 for trusted in self.trusted_peers.iter().cloned() {
48 let peer_id = trusted.id;
49 let task = async move {
50 let result = trusted.resolve().await;
51 (peer_id, result)
52 }
53 .boxed();
54 self.pending.push(task);
55 }
56 }
57
58 match ready!(self.pending.poll_next_unpin(cx)) {
59 Some((peer_id, Ok(record))) => Poll::Ready((peer_id, record)),
60 Some((peer_id, Err(e))) => {
61 warn!(target: "net::peers", "Failed to resolve trusted peer {:?}: {:?}", peer_id, e);
62 Poll::Pending
63 }
64 None => Poll::Pending,
65 }
66 }
67}