reth_network/
trusted_peers_resolver.rs

1//! Periodically resolves DNS records for a set of trusted peers and emits updates as they complete
2
3use futures::{future::BoxFuture, ready, stream::FuturesUnordered, FutureExt, StreamExt};
4use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
5use std::{
6    io,
7    task::{Context, Poll},
8};
9use tokio::time::Interval;
10use tracing::warn;
11
12/// `TrustedPeersResolver` periodically spawns DNS resolution tasks for trusted peers.
13/// It returns a resolved (`PeerId`, `NodeRecord`) update when one of its in‑flight tasks completes.
14#[derive(Debug)]
15pub struct TrustedPeersResolver {
16    /// The timer that triggers a new resolution cycle.
17    pub trusted_peers: Vec<TrustedPeer>,
18    /// The timer that triggers a new resolution cycle.
19    pub interval: Interval,
20    /// Futures for currently in‑flight resolution tasks.
21    pub pending: FuturesUnordered<BoxFuture<'static, (PeerId, Result<NodeRecord, io::Error>)>>,
22}
23
24impl TrustedPeersResolver {
25    /// Create a new resolver with the given trusted peers and resolution interval.
26    pub fn new(trusted_peers: Vec<TrustedPeer>, resolve_interval: Interval) -> Self {
27        Self { trusted_peers, interval: resolve_interval, pending: FuturesUnordered::new() }
28    }
29
30    /// Update the resolution interval (useful for testing purposes)
31    #[allow(dead_code)]
32    pub fn set_interval(&mut self, interval: Interval) {
33        self.interval = interval;
34    }
35
36    /// Poll the resolver.
37    /// When the interval ticks, new resolution futures for each trusted peer are spawned.
38    /// If a future completes successfully, it returns the resolved (`PeerId`, `NodeRecord`).
39    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(PeerId, NodeRecord)> {
40        if self.trusted_peers.is_empty() {
41            return Poll::Pending;
42        }
43
44        if self.interval.poll_tick(cx).is_ready() {
45            self.pending.clear();
46
47            for trusted in self.trusted_peers.iter().cloned() {
48                let peer_id = trusted.id;
49                let task = async move {
50                    let result = trusted.resolve().await;
51                    (peer_id, result)
52                }
53                .boxed();
54                self.pending.push(task);
55            }
56        }
57
58        match ready!(self.pending.poll_next_unpin(cx)) {
59            Some((peer_id, Ok(record))) => Poll::Ready((peer_id, record)),
60            Some((peer_id, Err(e))) => {
61                warn!(target: "net::peers", "Failed to resolve trusted peer {:?}: {:?}", peer_id, e);
62                Poll::Pending
63            }
64            None => Poll::Pending,
65        }
66    }
67}