reth_network/
trusted_peers_resolver.rs1use futures::{future::BoxFuture, ready, stream::FuturesUnordered, FutureExt, StreamExt};
4use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
5use std::{
6    io,
7    task::{Context, Poll},
8};
9use tokio::time::Interval;
10use tracing::warn;
11
12#[derive(Debug)]
15pub struct TrustedPeersResolver {
16    pub trusted_peers: Vec<TrustedPeer>,
18    pub interval: Interval,
20    pub pending: FuturesUnordered<BoxFuture<'static, (PeerId, Result<NodeRecord, io::Error>)>>,
22}
23
24impl TrustedPeersResolver {
25    pub fn new(trusted_peers: Vec<TrustedPeer>, resolve_interval: Interval) -> Self {
27        Self { trusted_peers, interval: resolve_interval, pending: FuturesUnordered::new() }
28    }
29
30    #[allow(dead_code)]
32    pub fn set_interval(&mut self, interval: Interval) {
33        self.interval = interval;
34    }
35
36    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(PeerId, NodeRecord)> {
40        if self.trusted_peers.is_empty() {
41            return Poll::Pending;
42        }
43
44        if self.interval.poll_tick(cx).is_ready() {
45            self.pending.clear();
46
47            for trusted in self.trusted_peers.iter().cloned() {
48                let peer_id = trusted.id;
49                let task = async move {
50                    let result = trusted.resolve().await;
51                    (peer_id, result)
52                }
53                .boxed();
54                self.pending.push(task);
55            }
56        }
57
58        match ready!(self.pending.poll_next_unpin(cx)) {
59            Some((peer_id, Ok(record))) => Poll::Ready((peer_id, record)),
60            Some((peer_id, Err(e))) => {
61                warn!(target: "net::peers", "Failed to resolve trusted peer {:?}: {:?}", peer_id, e);
62                Poll::Pending
63            }
64            None => Poll::Pending,
65        }
66    }
67}