reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend
161///
162/// This communicates with the [`Discv4Service`] by sending commands over a channel.
163///
164/// See also [`Discv4::spawn`]
165#[derive(Debug, Clone)]
166pub struct Discv4 {
167    /// The address of the udp socket
168    local_addr: SocketAddr,
169    /// channel to send commands over to the service
170    to_service: mpsc::UnboundedSender<Discv4Command>,
171    /// Tracks the local node record.
172    ///
173    /// This includes the currently tracked external IP address of the node.
174    node_record: Arc<Mutex<NodeRecord>>,
175}
176
177// === impl Discv4 ===
178
179impl Discv4 {
180    /// Same as [`Self::bind`] but also spawns the service onto a new task,
181    /// [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// # use std::io;
217    /// use rand::thread_rng;
218    /// use reth_discv4::{Discv4, Discv4Config};
219    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
220    /// use secp256k1::SECP256K1;
221    /// use std::{net::SocketAddr, str::FromStr};
222    /// # async fn t() -> io::Result<()> {
223    /// // generate a (random) keypair
224    /// let mut rng = thread_rng();
225    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng);
226    /// let id = pk2id(&pk);
227    ///
228    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
229    /// let local_enr =
230    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
231    /// let config = Discv4Config::default();
232    ///
233    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
234    ///
235    /// // get an update strea
236    /// let updates = service.update_stream();
237    ///
238    /// let _handle = service.spawn();
239    ///
240    /// // lookup the local node in the DHT
241    /// let _discovered = discv4.lookup_self().await.unwrap();
242    ///
243    /// # Ok(())
244    /// # }
245    /// ```
246    pub async fn bind(
247        local_address: SocketAddr,
248        mut local_node_record: NodeRecord,
249        secret_key: SecretKey,
250        config: Discv4Config,
251    ) -> io::Result<(Self, Discv4Service)> {
252        let socket = UdpSocket::bind(local_address).await?;
253        let local_addr = socket.local_addr()?;
254        local_node_record.udp_port = local_addr.port();
255        trace!(target: "discv4", ?local_addr,"opened UDP socket");
256
257        let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
258        let discv4 = service.handle();
259        Ok((discv4, service))
260    }
261
262    /// Returns the address of the UDP socket.
263    pub const fn local_addr(&self) -> SocketAddr {
264        self.local_addr
265    }
266
267    /// Returns the [`NodeRecord`] of the local node.
268    ///
269    /// This includes the currently tracked external IP address of the node.
270    pub fn node_record(&self) -> NodeRecord {
271        *self.node_record.lock()
272    }
273
274    /// Returns the currently tracked external IP of the node.
275    pub fn external_ip(&self) -> IpAddr {
276        self.node_record.lock().address
277    }
278
279    /// Sets the [Interval] used for periodically looking up targets over the network
280    pub fn set_lookup_interval(&self, duration: Duration) {
281        self.send_to_service(Discv4Command::SetLookupInterval(duration))
282    }
283
284    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
285    ///
286    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
287    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
288    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
289    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
290    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
291    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
292    /// they do respond.
293    //
294    // If a round of FindNode queries fails to return a node any closer than the closest already
295    // seen, the initiator resends the find node to all of the k closest nodes it has not already
296    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
297    // closest nodes it has seen.
298    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
299        self.lookup_node(None).await
300    }
301
302    /// Looks up the given node id.
303    ///
304    /// Returning the closest nodes to the given node id.
305    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
306        self.lookup_node(Some(node_id)).await
307    }
308
309    /// Performs a random lookup for node records.
310    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
311        let target = PeerId::random();
312        self.lookup_node(Some(target)).await
313    }
314
315    /// Sends a message to the service to lookup the closest nodes
316    pub fn send_lookup(&self, node_id: PeerId) {
317        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
318        self.send_to_service(cmd);
319    }
320
321    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
322        let (tx, rx) = oneshot::channel();
323        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
324        self.to_service.send(cmd)?;
325        Ok(rx.await?)
326    }
327
328    /// Triggers a new self lookup without expecting a response
329    pub fn send_lookup_self(&self) {
330        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
331        self.send_to_service(cmd);
332    }
333
334    /// Removes the peer from the table, if it exists.
335    pub fn remove_peer(&self, node_id: PeerId) {
336        let cmd = Discv4Command::Remove(node_id);
337        self.send_to_service(cmd);
338    }
339
340    /// Adds the node to the table, if it is not already present.
341    pub fn add_node(&self, node_record: NodeRecord) {
342        let cmd = Discv4Command::Add(node_record);
343        self.send_to_service(cmd);
344    }
345
346    /// Adds the peer and id to the ban list.
347    ///
348    /// This will prevent any future inclusion in the table
349    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
350        let cmd = Discv4Command::Ban(node_id, ip);
351        self.send_to_service(cmd);
352    }
353
354    /// Adds the ip to the ban list.
355    ///
356    /// This will prevent any future inclusion in the table
357    pub fn ban_ip(&self, ip: IpAddr) {
358        let cmd = Discv4Command::BanIp(ip);
359        self.send_to_service(cmd);
360    }
361
362    /// Adds the peer to the ban list.
363    ///
364    /// This will prevent any future inclusion in the table
365    pub fn ban_node(&self, node_id: PeerId) {
366        let cmd = Discv4Command::BanPeer(node_id);
367        self.send_to_service(cmd);
368    }
369
370    /// Sets the tcp port
371    ///
372    /// This will update our [`NodeRecord`]'s tcp port.
373    pub fn set_tcp_port(&self, port: u16) {
374        let cmd = Discv4Command::SetTcpPort(port);
375        self.send_to_service(cmd);
376    }
377
378    /// Sets the pair in the EIP-868 [`Enr`] of the node.
379    ///
380    /// If the key already exists, this will update it.
381    ///
382    /// CAUTION: The value **must** be rlp encoded
383    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
384        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
385        self.send_to_service(cmd);
386    }
387
388    /// Sets the pair in the EIP-868 [`Enr`] of the node.
389    ///
390    /// If the key already exists, this will update it.
391    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
392        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
393    }
394
395    #[inline]
396    fn send_to_service(&self, cmd: Discv4Command) {
397        let _ = self.to_service.send(cmd).map_err(|err| {
398            debug!(
399                target: "discv4",
400                %err,
401                "channel capacity reached, dropping command",
402            )
403        });
404    }
405
406    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
407    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
408        let (tx, rx) = oneshot::channel();
409        let cmd = Discv4Command::Updates(tx);
410        self.to_service.send(cmd)?;
411        Ok(rx.await?)
412    }
413
414    /// Terminates the spawned [`Discv4Service`].
415    pub fn terminate(&self) {
416        self.send_to_service(Discv4Command::Terminated);
417    }
418}
419
420/// Manages discv4 peer discovery over UDP.
421///
422/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
423/// [`Discv4Service::update_stream`].
424#[must_use = "Stream does nothing unless polled"]
425pub struct Discv4Service {
426    /// Local address of the UDP socket.
427    local_address: SocketAddr,
428    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
429    local_eip_868_enr: Enr<SecretKey>,
430    /// Local ENR of the server.
431    local_node_record: NodeRecord,
432    /// Keeps track of the node record of the local node.
433    shared_node_record: Arc<Mutex<NodeRecord>>,
434    /// The secret key used to sign payloads
435    secret_key: SecretKey,
436    /// The UDP socket for sending and receiving messages.
437    _socket: Arc<UdpSocket>,
438    /// The spawned UDP tasks.
439    ///
440    /// Note: If dropped, the spawned send+receive tasks are aborted.
441    _tasks: JoinSet<()>,
442    /// The routing table.
443    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
444    /// Receiver for incoming messages
445    ///
446    /// Receives incoming messages from the UDP task.
447    ingress: IngressReceiver,
448    /// Sender for sending outgoing messages
449    ///
450    /// Sends outgoind messages to the UDP task.
451    egress: EgressSender,
452    /// Buffered pending pings to apply backpressure.
453    ///
454    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
455    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
456    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
457    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
458    queued_pings: VecDeque<(NodeRecord, PingReason)>,
459    /// Currently active pings to specific nodes.
460    pending_pings: HashMap<PeerId, PingRequest>,
461    /// Currently active endpoint proof verification lookups to specific nodes.
462    ///
463    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
464    /// endpoint proof
465    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
466    /// Currently active `FindNode` requests
467    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
468    /// Currently active ENR requests
469    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
470    /// Copy of he sender half of the commands channel for [Discv4]
471    to_service: mpsc::UnboundedSender<Discv4Command>,
472    /// Receiver half of the commands channel for [Discv4]
473    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
474    /// All subscribers for table updates
475    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
476    /// The interval when to trigger random lookups
477    lookup_interval: Interval,
478    /// Used to rotate targets to lookup
479    lookup_rotator: LookupTargetRotator,
480    /// Interval when to recheck active requests
481    evict_expired_requests_interval: Interval,
482    /// Interval when to resend pings.
483    ping_interval: Interval,
484    /// The interval at which to attempt resolving external IP again.
485    resolve_external_ip_interval: Option<ResolveNatInterval>,
486    /// How this services is configured
487    config: Discv4Config,
488    /// Buffered events populated during poll.
489    queued_events: VecDeque<Discv4Event>,
490    /// Keeps track of nodes from which we have received a `Pong` message.
491    received_pongs: PongTable,
492    /// Interval used to expire additionally tracked nodes
493    expire_interval: Interval,
494}
495
496impl Discv4Service {
497    /// Create a new instance for a bound [`UdpSocket`].
498    pub(crate) fn new(
499        socket: UdpSocket,
500        local_address: SocketAddr,
501        local_node_record: NodeRecord,
502        secret_key: SecretKey,
503        config: Discv4Config,
504    ) -> Self {
505        let socket = Arc::new(socket);
506        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
507        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
508        let mut tasks = JoinSet::<()>::new();
509
510        let udp = Arc::clone(&socket);
511        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
512
513        let udp = Arc::clone(&socket);
514        tasks.spawn(send_loop(udp, egress_rx));
515
516        let kbuckets = KBucketsTable::new(
517            NodeKey::from(&local_node_record).into(),
518            Duration::from_secs(60),
519            MAX_NODES_PER_BUCKET,
520            None,
521            None,
522        );
523
524        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
525
526        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
527        // for
528        let ping_interval = tokio::time::interval_at(
529            tokio::time::Instant::now() + config.ping_interval,
530            config.ping_interval,
531        );
532
533        let evict_expired_requests_interval = tokio::time::interval_at(
534            tokio::time::Instant::now() + config.request_timeout,
535            config.request_timeout,
536        );
537
538        let lookup_rotator = if config.enable_dht_random_walk {
539            LookupTargetRotator::default()
540        } else {
541            LookupTargetRotator::local_only()
542        };
543
544        // for EIP-868 construct an ENR
545        let local_eip_868_enr = {
546            let mut builder = Enr::builder();
547            builder.ip(local_node_record.address);
548            if local_node_record.address.is_ipv4() {
549                builder.udp4(local_node_record.udp_port);
550                builder.tcp4(local_node_record.tcp_port);
551            } else {
552                builder.udp6(local_node_record.udp_port);
553                builder.tcp6(local_node_record.tcp_port);
554            }
555
556            for (key, val) in &config.additional_eip868_rlp_pairs {
557                builder.add_value_rlp(key, val.clone());
558            }
559            builder.build(&secret_key).expect("v4 is set")
560        };
561
562        let (to_service, commands_rx) = mpsc::unbounded_channel();
563
564        let shared_node_record = Arc::new(Mutex::new(local_node_record));
565
566        Self {
567            local_address,
568            local_eip_868_enr,
569            local_node_record,
570            shared_node_record,
571            _socket: socket,
572            kbuckets,
573            secret_key,
574            _tasks: tasks,
575            ingress: ingress_rx,
576            egress: egress_tx,
577            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
578            pending_pings: Default::default(),
579            pending_lookup: Default::default(),
580            pending_find_nodes: Default::default(),
581            pending_enr_requests: Default::default(),
582            commands_rx,
583            to_service,
584            update_listeners: Vec::with_capacity(1),
585            lookup_interval: self_lookup_interval,
586            ping_interval,
587            evict_expired_requests_interval,
588            lookup_rotator,
589            resolve_external_ip_interval: config.resolve_external_ip_interval(),
590            config,
591            queued_events: Default::default(),
592            received_pongs: Default::default(),
593            expire_interval: tokio::time::interval(EXPIRE_DURATION),
594        }
595    }
596
597    /// Returns the frontend handle that can communicate with the service via commands.
598    pub fn handle(&self) -> Discv4 {
599        Discv4 {
600            local_addr: self.local_address,
601            to_service: self.to_service.clone(),
602            node_record: self.shared_node_record.clone(),
603        }
604    }
605
606    /// Returns the current enr sequence of the local record.
607    fn enr_seq(&self) -> Option<u64> {
608        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
609    }
610
611    /// Sets the [Interval] used for periodically looking up targets over the network
612    pub fn set_lookup_interval(&mut self, duration: Duration) {
613        self.lookup_interval = tokio::time::interval(duration);
614    }
615
616    /// Sets the given ip address as the node's external IP in the node record announced in
617    /// discovery
618    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
619        if self.local_node_record.address != external_ip {
620            debug!(target: "discv4", ?external_ip, "Updating external ip");
621            self.local_node_record.address = external_ip;
622            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
623            let mut lock = self.shared_node_record.lock();
624            *lock = self.local_node_record;
625            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
626        }
627    }
628
629    /// Returns the [`PeerId`] that identifies this node
630    pub const fn local_peer_id(&self) -> &PeerId {
631        &self.local_node_record.id
632    }
633
634    /// Returns the address of the UDP socket
635    pub const fn local_addr(&self) -> SocketAddr {
636        self.local_address
637    }
638
639    /// Returns the ENR of this service.
640    ///
641    /// Note: this will include the external address if resolved.
642    pub const fn local_enr(&self) -> NodeRecord {
643        self.local_node_record
644    }
645
646    /// Returns mutable reference to ENR for testing.
647    #[cfg(test)]
648    pub fn local_enr_mut(&mut self) -> &mut NodeRecord {
649        &mut self.local_node_record
650    }
651
652    /// Returns true if the given `PeerId` is currently in the bucket
653    pub fn contains_node(&self, id: PeerId) -> bool {
654        let key = kad_key(id);
655        self.kbuckets.get_index(&key).is_some()
656    }
657
658    /// Bootstraps the local node to join the DHT.
659    ///
660    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
661    /// own ID in the DHT. This introduces the local node to the other nodes
662    /// in the DHT and populates its routing table with the closest proven neighbours.
663    ///
664    /// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a
665    /// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
666    /// update stream, which is usually desirable, since bootnodes should not be connected to.
667    ///
668    /// If adding the configured bootnodes should result in a [`DiscoveryUpdate::Added`], see
669    /// [`Self::add_all_nodes`].
670    ///
671    /// **Note:** This is a noop if there are no bootnodes.
672    pub fn bootstrap(&mut self) {
673        for record in self.config.bootstrap_nodes.clone() {
674            debug!(target: "discv4", ?record, "pinging boot node");
675            let key = kad_key(record.id);
676            let entry = NodeEntry::new(record);
677
678            // insert the boot node in the table
679            match self.kbuckets.insert_or_update(
680                &key,
681                entry,
682                NodeStatus {
683                    state: ConnectionState::Disconnected,
684                    direction: ConnectionDirection::Outgoing,
685                },
686            ) {
687                InsertResult::Failed(_) => {}
688                _ => {
689                    self.try_ping(record, PingReason::InitialInsert);
690                }
691            }
692        }
693    }
694
695    /// Spawns this services onto a new task
696    ///
697    /// Note: requires a running runtime
698    pub fn spawn(mut self) -> JoinHandle<()> {
699        tokio::task::spawn(async move {
700            self.bootstrap();
701
702            while let Some(event) = self.next().await {
703                trace!(target: "discv4", ?event, "processed");
704            }
705            trace!(target: "discv4", "service terminated");
706        })
707    }
708
709    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
710    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
711        let (tx, rx) = mpsc::channel(512);
712        self.update_listeners.push(tx);
713        ReceiverStream::new(rx)
714    }
715
716    /// Looks up the local node in the DHT.
717    pub fn lookup_self(&mut self) {
718        self.lookup(self.local_node_record.id)
719    }
720
721    /// Looks up the given node in the DHT
722    ///
723    /// A `FindNode` packet requests information about nodes close to target. The target is a
724    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
725    /// with Neighbors packets containing the closest 16 nodes to target found in its local
726    /// table.
727    //
728    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
729    // sender of FindNode has been verified by the endpoint proof procedure.
730    pub fn lookup(&mut self, target: PeerId) {
731        self.lookup_with(target, None)
732    }
733
734    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
735    ///
736    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
737    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
738    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
739    /// queries.
740    ///
741    /// This takes an optional Sender through which all successfully discovered nodes are sent once
742    /// the request has finished.
743    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
744        trace!(target: "discv4", ?target, "Starting lookup");
745        let target_key = kad_key(target);
746
747        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
748        // a valid endpoint proof
749        let ctx = LookupContext::new(
750            target_key.clone(),
751            self.kbuckets
752                .closest_values(&target_key)
753                .filter(|node| {
754                    node.value.has_endpoint_proof &&
755                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
756                })
757                .take(MAX_NODES_PER_BUCKET)
758                .map(|n| (target_key.distance(&n.key), n.value.record)),
759            tx,
760        );
761
762        // From those 16, pick the 3 closest to start the concurrent lookup.
763        let closest = ctx.closest(ALPHA);
764
765        if closest.is_empty() && self.pending_find_nodes.is_empty() {
766            // no closest nodes, and no lookup in progress: table is empty.
767            // This could happen if all records were deleted from the table due to missed pongs
768            // (e.g. connectivity problems over a long period of time, or issues during initial
769            // bootstrapping) so we attempt to bootstrap again
770            self.bootstrap();
771            return
772        }
773
774        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
775
776        for node in closest {
777            // here we still want to check against previous request failures and if necessary
778            // re-establish a new endpoint proof because it can be the case that the other node lost
779            // our entry and no longer has an endpoint proof on their end
780            self.find_node_checked(&node, ctx.clone());
781        }
782    }
783
784    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
785    ///
786    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
787    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
788        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
789        ctx.mark_queried(node.id);
790        let id = ctx.target();
791        let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
792        self.send_packet(msg, node.udp_addr());
793        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
794    }
795
796    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
797    /// whether we should send a new ping first to renew the endpoint proof by checking the
798    /// previously failed findNode requests. It could be that the node is no longer reachable or
799    /// lost our entry.
800    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
801        let max_failures = self.config.max_find_node_failures;
802        let needs_ping = self
803            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
804            .unwrap_or(true);
805        if needs_ping {
806            self.try_ping(*node, PingReason::Lookup(*node, ctx))
807        } else {
808            self.find_node(node, ctx)
809        }
810    }
811
812    /// Notifies all listeners.
813    ///
814    /// Removes all listeners that are closed.
815    fn notify(&mut self, update: DiscoveryUpdate) {
816        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
817            Ok(()) => true,
818            Err(err) => match err {
819                TrySendError::Full(_) => true,
820                TrySendError::Closed(_) => false,
821            },
822        });
823    }
824
825    /// Adds the ip to the ban list indefinitely
826    pub fn ban_ip(&mut self, ip: IpAddr) {
827        self.config.ban_list.ban_ip(ip);
828    }
829
830    /// Adds the peer to the ban list indefinitely.
831    pub fn ban_node(&mut self, node_id: PeerId) {
832        self.remove_node(node_id);
833        self.config.ban_list.ban_peer(node_id);
834    }
835
836    /// Adds the ip to the ban list until the given timestamp.
837    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
838        self.config.ban_list.ban_ip_until(ip, until);
839    }
840
841    /// Adds the peer to the ban list and bans it until the given timestamp
842    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
843        self.remove_node(node_id);
844        self.config.ban_list.ban_peer_until(node_id, until);
845    }
846
847    /// Removes a `node_id` from the routing table.
848    ///
849    /// This allows applications, for whatever reason, to remove nodes from the local routing
850    /// table. Returns `true` if the node was in the table and `false` otherwise.
851    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
852        let key = kad_key(node_id);
853        self.remove_key(node_id, key)
854    }
855
856    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
857    /// bucket (bucket must be at least half full)
858    ///
859    /// Returns `true` if the node was removed
860    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
861        let key = kad_key(node_id);
862        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
863        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
864            // skip half empty bucket
865            return false
866        }
867        self.remove_key(node_id, key)
868    }
869
870    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
871        let removed = self.kbuckets.remove(&key);
872        if removed {
873            trace!(target: "discv4", ?node_id, "removed node");
874            self.notify(DiscoveryUpdate::Removed(node_id));
875        }
876        removed
877    }
878
879    /// Gets the number of entries that are considered connected.
880    pub fn num_connected(&self) -> usize {
881        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
882    }
883
884    /// Check if the peer has an active bond.
885    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
886        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
887            if timestamp.elapsed() < self.config.bond_expiration {
888                return true
889            }
890        }
891        false
892    }
893
894    /// Applies a closure on the pending or present [`NodeEntry`].
895    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
896    where
897        F: FnOnce(&NodeEntry) -> R,
898    {
899        let key = kad_key(peer_id);
900        match self.kbuckets.entry(&key) {
901            BucketEntry::Present(entry, _) => Some(f(entry.value())),
902            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
903            _ => None,
904        }
905    }
906
907    /// Update the entry on RE-ping.
908    ///
909    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
910    ///
911    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
912    /// a followup request to retrieve the updated ENR
913    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
914        if record.id == self.local_node_record.id {
915            return
916        }
917
918        // If EIP868 extension is disabled then we want to ignore this
919        if !self.config.enable_eip868 {
920            last_enr_seq = None;
921        }
922
923        let key = kad_key(record.id);
924        let old_enr = match self.kbuckets.entry(&key) {
925            kbucket::Entry::Present(mut entry, _) => {
926                entry.value_mut().update_with_enr(last_enr_seq)
927            }
928            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
929            _ => return,
930        };
931
932        // Check if ENR was updated
933        match (last_enr_seq, old_enr) {
934            (Some(new), Some(old)) => {
935                if new > old {
936                    self.send_enr_request(record);
937                }
938            }
939            (Some(_), None) => {
940                // got an ENR
941                self.send_enr_request(record);
942            }
943            _ => {}
944        };
945    }
946
947    /// Callback invoked when we receive a pong from the peer.
948    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
949        if record.id == *self.local_peer_id() {
950            return
951        }
952
953        // If EIP868 extension is disabled then we want to ignore this
954        if !self.config.enable_eip868 {
955            last_enr_seq = None;
956        }
957
958        // if the peer included a enr seq in the pong then we can try to request the ENR of that
959        // node
960        let has_enr_seq = last_enr_seq.is_some();
961
962        let key = kad_key(record.id);
963        match self.kbuckets.entry(&key) {
964            kbucket::Entry::Present(mut entry, old_status) => {
965                // endpoint is now proven
966                entry.value_mut().establish_proof();
967                entry.value_mut().update_with_enr(last_enr_seq);
968
969                if !old_status.is_connected() {
970                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
971                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
972                    self.notify(DiscoveryUpdate::Added(record));
973
974                    if has_enr_seq {
975                        // request the ENR of the node
976                        self.send_enr_request(record);
977                    }
978                }
979            }
980            kbucket::Entry::Pending(mut entry, mut status) => {
981                // endpoint is now proven
982                entry.value().establish_proof();
983                entry.value().update_with_enr(last_enr_seq);
984
985                if !status.is_connected() {
986                    status.state = ConnectionState::Connected;
987                    let _ = entry.update(status);
988                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
989                    self.notify(DiscoveryUpdate::Added(record));
990
991                    if has_enr_seq {
992                        // request the ENR of the node
993                        self.send_enr_request(record);
994                    }
995                }
996            }
997            _ => {}
998        };
999    }
1000
1001    /// Adds all nodes
1002    ///
1003    /// See [`Self::add_node`]
1004    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1005        for record in records {
1006            self.add_node(record);
1007        }
1008    }
1009
1010    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1011    /// proof by sending a ping to the node.
1012    ///
1013    /// Returns `true` if the record was added successfully, and `false` if the node is either
1014    /// already in the table or the record's bucket is full.
1015    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1016        let key = kad_key(record.id);
1017        match self.kbuckets.entry(&key) {
1018            kbucket::Entry::Absent(entry) => {
1019                let node = NodeEntry::new(record);
1020                match entry.insert(
1021                    node,
1022                    NodeStatus {
1023                        direction: ConnectionDirection::Outgoing,
1024                        state: ConnectionState::Disconnected,
1025                    },
1026                ) {
1027                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1028                        trace!(target: "discv4", ?record, "inserted new record");
1029                    }
1030                    _ => return false,
1031                }
1032            }
1033            _ => return false,
1034        }
1035
1036        // send the initial ping to the _new_ node
1037        self.try_ping(record, PingReason::InitialInsert);
1038        true
1039    }
1040
1041    /// Encodes the packet, sends it and returns the hash.
1042    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1043        let (payload, hash) = msg.encode(&self.secret_key);
1044        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1045        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1046            debug!(
1047                target: "discv4",
1048                %err,
1049                "dropped outgoing packet",
1050            );
1051        });
1052        hash
1053    }
1054
1055    /// Message handler for an incoming `Ping`
1056    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1057        if self.is_expired(ping.expire) {
1058            // ping's expiration timestamp is in the past
1059            return
1060        }
1061
1062        // create the record
1063        let record = NodeRecord {
1064            address: remote_addr.ip(),
1065            udp_port: remote_addr.port(),
1066            tcp_port: ping.from.tcp_port,
1067            id: remote_id,
1068        }
1069        .into_ipv4_mapped();
1070
1071        let key = kad_key(record.id);
1072
1073        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1074        // > If no communication with the sender of this ping has occurred within the last 12h, a
1075        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1076        //
1077        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1078        // the ping interval
1079        let mut is_new_insert = false;
1080        let mut needs_bond = false;
1081        let mut is_proven = false;
1082
1083        let old_enr = match self.kbuckets.entry(&key) {
1084            kbucket::Entry::Present(mut entry, _) => {
1085                if entry.value().is_expired() {
1086                    // If no communication with the sender has occurred within the last 12h, a ping
1087                    // should be sent in addition to pong in order to receive an endpoint proof.
1088                    needs_bond = true;
1089                } else {
1090                    is_proven = entry.value().has_endpoint_proof;
1091                }
1092                entry.value_mut().update_with_enr(ping.enr_sq)
1093            }
1094            kbucket::Entry::Pending(mut entry, _) => {
1095                if entry.value().is_expired() {
1096                    // If no communication with the sender has occurred within the last 12h, a ping
1097                    // should be sent in addition to pong in order to receive an endpoint proof.
1098                    needs_bond = true;
1099                } else {
1100                    is_proven = entry.value().has_endpoint_proof;
1101                }
1102                entry.value().update_with_enr(ping.enr_sq)
1103            }
1104            kbucket::Entry::Absent(entry) => {
1105                let mut node = NodeEntry::new(record);
1106                node.last_enr_seq = ping.enr_sq;
1107
1108                match entry.insert(
1109                    node,
1110                    NodeStatus {
1111                        direction: ConnectionDirection::Incoming,
1112                        // mark as disconnected until endpoint proof established on pong
1113                        state: ConnectionState::Disconnected,
1114                    },
1115                ) {
1116                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1117                        // mark as new insert if insert was successful
1118                        is_new_insert = true;
1119                    }
1120                    BucketInsertResult::Full => {
1121                        // we received a ping but the corresponding bucket for the peer is already
1122                        // full, we can't add any additional peers to that bucket, but we still want
1123                        // to emit an event that we discovered the node
1124                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1125                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1126                        needs_bond = true;
1127                    }
1128                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1129                        needs_bond = true;
1130                        // insert unsuccessful but we still want to send the pong
1131                    }
1132                    BucketInsertResult::FailedFilter => return,
1133                }
1134
1135                None
1136            }
1137            kbucket::Entry::SelfEntry => return,
1138        };
1139
1140        // send the pong first, but the PONG and optionally PING don't need to be send in a
1141        // particular order
1142        let pong = Message::Pong(Pong {
1143            // we use the actual address of the peer
1144            to: record.into(),
1145            echo: hash,
1146            expire: ping.expire,
1147            enr_sq: self.enr_seq(),
1148        });
1149        self.send_packet(pong, remote_addr);
1150
1151        // if node was absent also send a ping to establish the endpoint proof from our end
1152        if is_new_insert {
1153            self.try_ping(record, PingReason::InitialInsert);
1154        } else if needs_bond {
1155            self.try_ping(record, PingReason::EstablishBond);
1156        } else if is_proven {
1157            // if node has been proven, this means we've received a pong and verified its endpoint
1158            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1159            // send our find_nodes request if PingReason::Lookup
1160            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1161                if self.pending_find_nodes.contains_key(&record.id) {
1162                    // there's already another pending request, unmark it so the next round can
1163                    // try to send it
1164                    ctx.unmark_queried(record.id);
1165                } else {
1166                    // we just received a ping from that peer so we can send a find node request
1167                    // directly
1168                    self.find_node(&record, ctx);
1169                }
1170            }
1171        } else {
1172            // Request ENR if included in the ping
1173            match (ping.enr_sq, old_enr) {
1174                (Some(new), Some(old)) => {
1175                    if new > old {
1176                        self.send_enr_request(record);
1177                    }
1178                }
1179                (Some(_), None) => {
1180                    self.send_enr_request(record);
1181                }
1182                _ => {}
1183            };
1184        }
1185    }
1186
1187    // Guarding function for [`Self::send_ping`] that applies pre-checks
1188    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1189        if node.id == *self.local_peer_id() {
1190            // don't ping ourselves
1191            return
1192        }
1193
1194        if self.pending_pings.contains_key(&node.id) ||
1195            self.pending_find_nodes.contains_key(&node.id)
1196        {
1197            return
1198        }
1199
1200        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1201            return
1202        }
1203
1204        if self.pending_pings.len() < MAX_NODES_PING {
1205            self.send_ping(node, reason);
1206        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1207            self.queued_pings.push_back((node, reason));
1208        }
1209    }
1210
1211    /// Sends a ping message to the node's UDP address.
1212    ///
1213    /// Returns the echo hash of the ping message.
1214    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1215        let remote_addr = node.udp_addr();
1216        let id = node.id;
1217        let ping = Ping {
1218            from: self.local_node_record.into(),
1219            to: node.into(),
1220            expire: self.ping_expiration(),
1221            enr_sq: self.enr_seq(),
1222        };
1223        trace!(target: "discv4", ?ping, "sending ping");
1224        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1225
1226        self.pending_pings
1227            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1228        echo_hash
1229    }
1230
1231    /// Sends an enr request message to the node's UDP address.
1232    ///
1233    /// Returns the echo hash of the ping message.
1234    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1235        if !self.config.enable_eip868 {
1236            return
1237        }
1238        let remote_addr = node.udp_addr();
1239        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1240
1241        trace!(target: "discv4", ?enr_request, "sending enr request");
1242        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1243
1244        self.pending_enr_requests
1245            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1246    }
1247
1248    /// Message handler for an incoming `Pong`.
1249    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1250        if self.is_expired(pong.expire) {
1251            return
1252        }
1253
1254        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1255            Entry::Occupied(entry) => {
1256                {
1257                    let request = entry.get();
1258                    if request.echo_hash != pong.echo {
1259                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1260                        return
1261                    }
1262                }
1263                entry.remove()
1264            }
1265            Entry::Vacant(_) => return,
1266        };
1267
1268        // keep track of the pong
1269        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1270
1271        match reason {
1272            PingReason::InitialInsert => {
1273                self.update_on_pong(node, pong.enr_sq);
1274            }
1275            PingReason::EstablishBond => {
1276                // same as `InitialInsert` which renews the bond if the peer is in the table
1277                self.update_on_pong(node, pong.enr_sq);
1278            }
1279            PingReason::RePing => {
1280                self.update_on_reping(node, pong.enr_sq);
1281            }
1282            PingReason::Lookup(node, ctx) => {
1283                self.update_on_pong(node, pong.enr_sq);
1284                // insert node and assoc. lookup_context into the pending_lookup table to complete
1285                // our side of the endpoint proof verification.
1286                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1287                // timer than the ping_request timer.
1288                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1289            }
1290        }
1291    }
1292
1293    /// Handler for an incoming `FindNode` message
1294    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1295        if self.is_expired(msg.expire) {
1296            // expiration timestamp is in the past
1297            return
1298        }
1299        if node_id == *self.local_peer_id() {
1300            // ignore find node requests to ourselves
1301            return
1302        }
1303
1304        if self.has_bond(node_id, remote_addr.ip()) {
1305            self.respond_closest(msg.id, remote_addr)
1306        }
1307    }
1308
1309    /// Handler for incoming `EnrResponse` message
1310    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1311        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1312        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1313            // ensure the ENR's public key matches the expected node id
1314            let enr_id = pk2id(&msg.enr.public_key());
1315            if id != enr_id {
1316                return
1317            }
1318
1319            if resp.echo_hash == msg.request_hash {
1320                let key = kad_key(id);
1321                let fork_id = msg.eth_fork_id();
1322                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1323                    kbucket::Entry::Present(mut entry, _) => {
1324                        let id = entry.value_mut().update_with_fork_id(fork_id);
1325                        (entry.value().record, id)
1326                    }
1327                    kbucket::Entry::Pending(mut entry, _) => {
1328                        let id = entry.value().update_with_fork_id(fork_id);
1329                        (entry.value().record, id)
1330                    }
1331                    _ => return,
1332                };
1333                match (fork_id, old_fork_id) {
1334                    (Some(new), Some(old)) => {
1335                        if new != old {
1336                            self.notify(DiscoveryUpdate::EnrForkId(record, new))
1337                        }
1338                    }
1339                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1340                    _ => {}
1341                }
1342            }
1343        }
1344    }
1345
1346    /// Handler for incoming `EnrRequest` message
1347    fn on_enr_request(
1348        &self,
1349        msg: EnrRequest,
1350        remote_addr: SocketAddr,
1351        id: PeerId,
1352        request_hash: B256,
1353    ) {
1354        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1355            return
1356        }
1357
1358        if self.has_bond(id, remote_addr.ip()) {
1359            self.send_packet(
1360                Message::EnrResponse(EnrResponse {
1361                    request_hash,
1362                    enr: self.local_eip_868_enr.clone(),
1363                }),
1364                remote_addr,
1365            );
1366        }
1367    }
1368
1369    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1370    /// `FindNode` requests.
1371    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1372        if self.is_expired(msg.expire) {
1373            // response is expired
1374            return
1375        }
1376        // check if this request was expected
1377        let ctx = match self.pending_find_nodes.entry(node_id) {
1378            Entry::Occupied(mut entry) => {
1379                {
1380                    let request = entry.get_mut();
1381                    // Mark the request as answered
1382                    request.answered = true;
1383                    let total = request.response_count + msg.nodes.len();
1384
1385                    // Neighbours response is exactly 1 bucket (16 entries).
1386                    if total <= MAX_NODES_PER_BUCKET {
1387                        request.response_count = total;
1388                    } else {
1389                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1390                        return
1391                    }
1392                };
1393
1394                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1395                    // node responding with a full bucket of records
1396                    let ctx = entry.remove().lookup_context;
1397                    ctx.mark_responded(node_id);
1398                    ctx
1399                } else {
1400                    entry.get().lookup_context.clone()
1401                }
1402            }
1403            Entry::Vacant(_) => {
1404                // received neighbours response without requesting it
1405                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1406                return
1407            }
1408        };
1409
1410        // log the peers we discovered
1411        trace!(target: "discv4",
1412            target=format!("{:#?}", node_id),
1413            peers_count=msg.nodes.len(),
1414            peers=format!("[{:#}]", msg.nodes.iter()
1415                .map(|node_rec| node_rec.id
1416            ).format(", ")),
1417            "Received peers from Neighbours packet"
1418        );
1419
1420        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1421        // that were discovered.
1422        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1423            // prevent banned peers from being added to the context
1424            if self.config.ban_list.is_banned(&node.id, &node.address) {
1425                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1426                continue
1427            }
1428
1429            ctx.add_node(node);
1430        }
1431
1432        // get the next closest nodes, not yet queried nodes and start over.
1433        let closest =
1434            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1435
1436        for closest in closest {
1437            let key = kad_key(closest.id);
1438            match self.kbuckets.entry(&key) {
1439                BucketEntry::Absent(entry) => {
1440                    // the node's endpoint is not proven yet, so we need to ping it first, on
1441                    // success, we will add the node to the pending_lookup table, and wait to send
1442                    // back a Pong before initiating a FindNode request.
1443                    // In order to prevent that this node is selected again on subsequent responses,
1444                    // while the ping is still active, we always mark it as queried.
1445                    ctx.mark_queried(closest.id);
1446                    let node = NodeEntry::new(closest);
1447                    match entry.insert(
1448                        node,
1449                        NodeStatus {
1450                            direction: ConnectionDirection::Outgoing,
1451                            state: ConnectionState::Disconnected,
1452                        },
1453                    ) {
1454                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1455                            // only ping if the node was added to the table
1456                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1457                        }
1458                        BucketInsertResult::Full => {
1459                            // new node but the node's bucket is already full
1460                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1461                        }
1462                        _ => {}
1463                    }
1464                }
1465                BucketEntry::SelfEntry => {
1466                    // we received our own node entry
1467                }
1468                BucketEntry::Present(entry, _) => {
1469                    if entry.value().has_endpoint_proof {
1470                        if entry
1471                            .value()
1472                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1473                        {
1474                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1475                        } else {
1476                            self.find_node(&closest, ctx.clone());
1477                        }
1478                    }
1479                }
1480                BucketEntry::Pending(mut entry, _) => {
1481                    if entry.value().has_endpoint_proof {
1482                        if entry
1483                            .value()
1484                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1485                        {
1486                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1487                        } else {
1488                            self.find_node(&closest, ctx.clone());
1489                        }
1490                    }
1491                }
1492            }
1493        }
1494    }
1495
1496    /// Sends a Neighbours packet for `target` to the given addr
1497    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1498        let key = kad_key(target);
1499        let expire = self.send_neighbours_expiration();
1500
1501        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1502        let closest_nodes =
1503            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1504
1505        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1506            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1507            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1508            let msg = Message::Neighbours(Neighbours { nodes, expire });
1509            self.send_packet(msg, to);
1510        }
1511    }
1512
1513    fn evict_expired_requests(&mut self, now: Instant) {
1514        self.pending_enr_requests.retain(|_node_id, enr_request| {
1515            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1516        });
1517
1518        let mut failed_pings = Vec::new();
1519        self.pending_pings.retain(|node_id, ping_request| {
1520            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1521                failed_pings.push(*node_id);
1522                return false
1523            }
1524            true
1525        });
1526
1527        if !failed_pings.is_empty() {
1528            // remove nodes that failed to pong
1529            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1530            for node_id in failed_pings {
1531                self.remove_node(node_id);
1532            }
1533        }
1534
1535        let mut failed_lookups = Vec::new();
1536        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1537            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1538                failed_lookups.push(*node_id);
1539                return false
1540            }
1541            true
1542        });
1543
1544        if !failed_lookups.is_empty() {
1545            // remove nodes that failed the e2e lookup process, so we can restart it
1546            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1547            for node_id in failed_lookups {
1548                self.remove_node(node_id);
1549            }
1550        }
1551
1552        self.evict_failed_find_nodes(now);
1553    }
1554
1555    /// Handles failed responses to `FindNode`
1556    fn evict_failed_find_nodes(&mut self, now: Instant) {
1557        let mut failed_find_nodes = Vec::new();
1558        self.pending_find_nodes.retain(|node_id, find_node_request| {
1559            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1560                if !find_node_request.answered {
1561                    // node actually responded but with fewer entries than expected, but we don't
1562                    // treat this as an hard error since it responded.
1563                    failed_find_nodes.push(*node_id);
1564                }
1565                return false
1566            }
1567            true
1568        });
1569
1570        if failed_find_nodes.is_empty() {
1571            return
1572        }
1573
1574        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1575
1576        for node_id in failed_find_nodes {
1577            let key = kad_key(node_id);
1578            let failures = match self.kbuckets.entry(&key) {
1579                kbucket::Entry::Present(mut entry, _) => {
1580                    entry.value_mut().inc_failed_request();
1581                    entry.value().find_node_failures
1582                }
1583                kbucket::Entry::Pending(mut entry, _) => {
1584                    entry.value().inc_failed_request();
1585                    entry.value().find_node_failures
1586                }
1587                _ => continue,
1588            };
1589
1590            // if the node failed to respond anything useful multiple times, remove the node from
1591            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1592            // least half full)
1593            if failures > self.config.max_find_node_failures {
1594                self.soft_remove_node(node_id);
1595            }
1596        }
1597    }
1598
1599    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1600    ///
1601    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1602    /// endpoint proof it will be removed from the table.
1603    fn re_ping_oldest(&mut self) {
1604        let mut nodes = self
1605            .kbuckets
1606            .iter_ref()
1607            .filter(|entry| entry.node.value.is_expired())
1608            .map(|n| n.node.value)
1609            .collect::<Vec<_>>();
1610        nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1611        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1612        for node in to_ping {
1613            self.try_ping(node, PingReason::RePing)
1614        }
1615    }
1616
1617    /// Returns true if the expiration timestamp is in the past.
1618    fn is_expired(&self, expiration: u64) -> bool {
1619        self.ensure_not_expired(expiration).is_err()
1620    }
1621
1622    /// Validate that given timestamp is not expired.
1623    ///
1624    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1625    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1626    /// be an i64.
1627    ///
1628    /// Returns an error if:
1629    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1630    ///  - timestamp is expired (lower than current local UNIX timestamp)
1631    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1632        // ensure the timestamp is a valid UNIX timestamp
1633        let _ = i64::try_from(timestamp).map_err(drop)?;
1634
1635        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1636        if self.config.enforce_expiration_timestamps && timestamp < now {
1637            trace!(target: "discv4", "Expired packet");
1638            return Err(())
1639        }
1640        Ok(())
1641    }
1642
1643    /// Pops buffered ping requests and sends them.
1644    fn ping_buffered(&mut self) {
1645        while self.pending_pings.len() < MAX_NODES_PING {
1646            match self.queued_pings.pop_front() {
1647                Some((next, reason)) => self.try_ping(next, reason),
1648                None => break,
1649            }
1650        }
1651    }
1652
1653    fn ping_expiration(&self) -> u64 {
1654        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1655            .as_secs()
1656    }
1657
1658    fn find_node_expiration(&self) -> u64 {
1659        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1660            .as_secs()
1661    }
1662
1663    fn enr_request_expiration(&self) -> u64 {
1664        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1665            .as_secs()
1666    }
1667
1668    fn send_neighbours_expiration(&self) -> u64 {
1669        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1670            .as_secs()
1671    }
1672
1673    /// Polls the socket and advances the state.
1674    ///
1675    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1676    /// query participates in the discovery protocol. The sender of a packet is considered verified
1677    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1678    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1679        loop {
1680            // drain buffered events first
1681            if let Some(event) = self.queued_events.pop_front() {
1682                return Poll::Ready(event)
1683            }
1684
1685            // trigger self lookup
1686            if self.config.enable_lookup {
1687                while self.lookup_interval.poll_tick(cx).is_ready() {
1688                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1689                    self.lookup_with(target, None);
1690                }
1691            }
1692
1693            // re-ping some peers
1694            while self.ping_interval.poll_tick(cx).is_ready() {
1695                self.re_ping_oldest();
1696            }
1697
1698            if let Some(Poll::Ready(Some(ip))) =
1699                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1700            {
1701                self.set_external_ip_addr(ip);
1702            }
1703
1704            // drain all incoming `Discv4` commands, this channel can never close
1705            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1706                match cmd {
1707                    Discv4Command::Add(enr) => {
1708                        self.add_node(enr);
1709                    }
1710                    Discv4Command::Lookup { node_id, tx } => {
1711                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1712                        self.lookup_with(node_id, tx);
1713                    }
1714                    Discv4Command::SetLookupInterval(duration) => {
1715                        self.set_lookup_interval(duration);
1716                    }
1717                    Discv4Command::Updates(tx) => {
1718                        let rx = self.update_stream();
1719                        let _ = tx.send(rx);
1720                    }
1721                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1722                    Discv4Command::Remove(node_id) => {
1723                        self.remove_node(node_id);
1724                    }
1725                    Discv4Command::Ban(node_id, ip) => {
1726                        self.ban_node(node_id);
1727                        self.ban_ip(ip);
1728                    }
1729                    Discv4Command::BanIp(ip) => {
1730                        self.ban_ip(ip);
1731                    }
1732                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1733                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1734
1735                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1736                    }
1737                    Discv4Command::SetTcpPort(port) => {
1738                        debug!(target: "discv4", %port, "Update tcp port");
1739                        self.local_node_record.tcp_port = port;
1740                        if self.local_node_record.address.is_ipv4() {
1741                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1742                        } else {
1743                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1744                        }
1745                    }
1746
1747                    Discv4Command::Terminated => {
1748                        // terminate the service
1749                        self.queued_events.push_back(Discv4Event::Terminated);
1750                    }
1751                }
1752            }
1753
1754            // restricts how many messages we process in a single poll before yielding back control
1755            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1756
1757            // process all incoming datagrams
1758            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1759                match event {
1760                    IngressEvent::RecvError(err) => {
1761                        debug!(target: "discv4", %err, "failed to read datagram");
1762                    }
1763                    IngressEvent::BadPacket(from, err, data) => {
1764                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1765                    }
1766                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1767                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1768                        let event = match msg {
1769                            Message::Ping(ping) => {
1770                                self.on_ping(ping, remote_addr, node_id, hash);
1771                                Discv4Event::Ping
1772                            }
1773                            Message::Pong(pong) => {
1774                                self.on_pong(pong, remote_addr, node_id);
1775                                Discv4Event::Pong
1776                            }
1777                            Message::FindNode(msg) => {
1778                                self.on_find_node(msg, remote_addr, node_id);
1779                                Discv4Event::FindNode
1780                            }
1781                            Message::Neighbours(msg) => {
1782                                self.on_neighbours(msg, remote_addr, node_id);
1783                                Discv4Event::Neighbours
1784                            }
1785                            Message::EnrRequest(msg) => {
1786                                self.on_enr_request(msg, remote_addr, node_id, hash);
1787                                Discv4Event::EnrRequest
1788                            }
1789                            Message::EnrResponse(msg) => {
1790                                self.on_enr_response(msg, remote_addr, node_id);
1791                                Discv4Event::EnrResponse
1792                            }
1793                        };
1794
1795                        self.queued_events.push_back(event);
1796                    }
1797                }
1798
1799                udp_message_budget -= 1;
1800                if udp_message_budget < 0 {
1801                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1802                    if self.queued_events.is_empty() {
1803                        // we've exceeded the message budget and have no events to process
1804                        // this will make sure we're woken up again
1805                        cx.waker().wake_by_ref();
1806                    }
1807                    break
1808                }
1809            }
1810
1811            // try resending buffered pings
1812            self.ping_buffered();
1813
1814            // evict expired requests
1815            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1816                self.evict_expired_requests(Instant::now());
1817            }
1818
1819            // evict expired nodes
1820            while self.expire_interval.poll_tick(cx).is_ready() {
1821                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1822            }
1823
1824            if self.queued_events.is_empty() {
1825                return Poll::Pending
1826            }
1827        }
1828    }
1829}
1830
1831/// Endless future impl
1832impl Stream for Discv4Service {
1833    type Item = Discv4Event;
1834
1835    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1836        // Poll the internal poll method
1837        match ready!(self.get_mut().poll(cx)) {
1838            // if the service is terminated, return None to terminate the stream
1839            Discv4Event::Terminated => Poll::Ready(None),
1840            // For any other event, return Poll::Ready(Some(event))
1841            ev => Poll::Ready(Some(ev)),
1842        }
1843    }
1844}
1845
1846impl fmt::Debug for Discv4Service {
1847    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1848        f.debug_struct("Discv4Service")
1849            .field("local_address", &self.local_address)
1850            .field("local_peer_id", &self.local_peer_id())
1851            .field("local_node_record", &self.local_node_record)
1852            .field("queued_pings", &self.queued_pings)
1853            .field("pending_lookup", &self.pending_lookup)
1854            .field("pending_find_nodes", &self.pending_find_nodes)
1855            .field("lookup_interval", &self.lookup_interval)
1856            .finish_non_exhaustive()
1857    }
1858}
1859
1860/// The Event type the Service stream produces.
1861///
1862/// This is mainly used for testing purposes and represents messages the service processed
1863#[derive(Debug, Eq, PartialEq)]
1864pub enum Discv4Event {
1865    /// A `Ping` message was handled.
1866    Ping,
1867    /// A `Pong` message was handled.
1868    Pong,
1869    /// A `FindNode` message was handled.
1870    FindNode,
1871    /// A `Neighbours` message was handled.
1872    Neighbours,
1873    /// A `EnrRequest` message was handled.
1874    EnrRequest,
1875    /// A `EnrResponse` message was handled.
1876    EnrResponse,
1877    /// Service is being terminated
1878    Terminated,
1879}
1880
1881/// Continuously reads new messages from the channel and writes them to the socket
1882pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1883    let mut stream = ReceiverStream::new(rx);
1884    while let Some((payload, to)) = stream.next().await {
1885        match udp.send_to(&payload, to).await {
1886            Ok(size) => {
1887                trace!(target: "discv4", ?to, ?size,"sent payload");
1888            }
1889            Err(err) => {
1890                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1891            }
1892        }
1893    }
1894}
1895
1896/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1897const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1898
1899/// Continuously awaits new incoming messages and sends them back through the channel.
1900///
1901/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1902/// individual IPs
1903pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1904    let send = |event: IngressEvent| async {
1905        let _ = tx.send(event).await.map_err(|err| {
1906            debug!(
1907                target: "discv4",
1908                 %err,
1909                "failed send incoming packet",
1910            )
1911        });
1912    };
1913
1914    let mut cache = ReceiveCache::default();
1915
1916    // tick at half the rate of the limit
1917    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1918    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1919
1920    let mut buf = [0; MAX_PACKET_SIZE];
1921    loop {
1922        let res = udp.recv_from(&mut buf).await;
1923        match res {
1924            Err(err) => {
1925                debug!(target: "discv4", %err, "Failed to read datagram.");
1926                send(IngressEvent::RecvError(err)).await;
1927            }
1928            Ok((read, remote_addr)) => {
1929                // rate limit incoming packets by IP
1930                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1931                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1932                    continue
1933                }
1934
1935                let packet = &buf[..read];
1936                match Message::decode(packet) {
1937                    Ok(packet) => {
1938                        if packet.node_id == local_id {
1939                            // received our own message
1940                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
1941                            continue
1942                        }
1943
1944                        // skip if we've already received the same packet
1945                        if cache.contains_packet(packet.hash) {
1946                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1947                            continue
1948                        }
1949
1950                        send(IngressEvent::Packet(remote_addr, packet)).await;
1951                    }
1952                    Err(err) => {
1953                        trace!(target: "discv4", %err,"Failed to decode packet");
1954                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1955                    }
1956                }
1957            }
1958        }
1959
1960        // reset the tracked ips if the interval has passed
1961        if poll_fn(|cx| match interval.poll_tick(cx) {
1962            Poll::Ready(_) => Poll::Ready(true),
1963            Poll::Pending => Poll::Ready(false),
1964        })
1965        .await
1966        {
1967            cache.tick_ips(tick);
1968        }
1969    }
1970}
1971
1972/// A cache for received packets and their source address.
1973///
1974/// This is used to discard duplicated packets and rate limit messages from the same source.
1975struct ReceiveCache {
1976    /// keeps track of how many messages we've received from a given IP address since the last
1977    /// tick.
1978    ///
1979    /// This is used to count the number of messages received from a given IP address within an
1980    /// interval.
1981    ip_messages: HashMap<IpAddr, usize>,
1982    // keeps track of unique packet hashes
1983    unique_packets: schnellru::LruMap<B256, ()>,
1984}
1985
1986impl ReceiveCache {
1987    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
1988    ///
1989    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
1990    fn tick_ips(&mut self, tick: usize) {
1991        self.ip_messages.retain(|_, count| {
1992            if let Some(reset) = count.checked_sub(tick) {
1993                *count = reset;
1994                true
1995            } else {
1996                false
1997            }
1998        });
1999    }
2000
2001    /// Increases the counter for the given IP address and returns the new count.
2002    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2003        let ctn = self.ip_messages.entry(ip).or_default();
2004        *ctn = ctn.saturating_add(1);
2005        *ctn
2006    }
2007
2008    /// Returns true if we previously received the packet
2009    fn contains_packet(&mut self, hash: B256) -> bool {
2010        !self.unique_packets.insert(hash, ())
2011    }
2012}
2013
2014impl Default for ReceiveCache {
2015    fn default() -> Self {
2016        Self {
2017            ip_messages: Default::default(),
2018            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2019        }
2020    }
2021}
2022
2023/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2024enum Discv4Command {
2025    Add(NodeRecord),
2026    SetTcpPort(u16),
2027    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2028    Ban(PeerId, IpAddr),
2029    BanPeer(PeerId),
2030    BanIp(IpAddr),
2031    Remove(PeerId),
2032    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2033    SetLookupInterval(Duration),
2034    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2035    Terminated,
2036}
2037
2038/// Event type receiver produces
2039#[derive(Debug)]
2040pub(crate) enum IngressEvent {
2041    /// Encountered an error when reading a datagram message.
2042    RecvError(io::Error),
2043    /// Received a bad message
2044    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2045    /// Received a datagram from an address.
2046    Packet(SocketAddr, Packet),
2047}
2048
2049/// Tracks a sent ping
2050#[derive(Debug)]
2051struct PingRequest {
2052    // Timestamp when the request was sent.
2053    sent_at: Instant,
2054    // Node to which the request was sent.
2055    node: NodeRecord,
2056    // Hash sent in the Ping request
2057    echo_hash: B256,
2058    /// Why this ping was sent.
2059    reason: PingReason,
2060}
2061
2062/// Rotates the `PeerId` that is periodically looked up.
2063///
2064/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2065#[derive(Debug)]
2066struct LookupTargetRotator {
2067    interval: usize,
2068    counter: usize,
2069}
2070
2071// === impl LookupTargetRotator ===
2072
2073impl LookupTargetRotator {
2074    /// Returns a rotator that always returns the local target.
2075    const fn local_only() -> Self {
2076        Self { interval: 1, counter: 0 }
2077    }
2078}
2079
2080impl Default for LookupTargetRotator {
2081    fn default() -> Self {
2082        Self {
2083            // every 4th lookup is our own node
2084            interval: 4,
2085            counter: 3,
2086        }
2087    }
2088}
2089
2090impl LookupTargetRotator {
2091    /// this will return the next node id to lookup
2092    fn next(&mut self, local: &PeerId) -> PeerId {
2093        self.counter += 1;
2094        self.counter %= self.interval;
2095        if self.counter == 0 {
2096            return *local
2097        }
2098        PeerId::random()
2099    }
2100}
2101
2102/// Tracks lookups across multiple `FindNode` requests.
2103///
2104/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2105/// one is present.
2106#[derive(Clone, Debug)]
2107struct LookupContext {
2108    inner: Rc<LookupContextInner>,
2109}
2110
2111impl LookupContext {
2112    /// Create new context for a recursive lookup
2113    fn new(
2114        target: discv5::Key<NodeKey>,
2115        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2116        listener: Option<NodeRecordSender>,
2117    ) -> Self {
2118        let closest_nodes = nearest_nodes
2119            .into_iter()
2120            .map(|(distance, record)| {
2121                (distance, QueryNode { record, queried: false, responded: false })
2122            })
2123            .collect();
2124
2125        let inner = Rc::new(LookupContextInner {
2126            target,
2127            closest_nodes: RefCell::new(closest_nodes),
2128            listener,
2129        });
2130        Self { inner }
2131    }
2132
2133    /// Returns the target of this lookup
2134    fn target(&self) -> PeerId {
2135        self.inner.target.preimage().0
2136    }
2137
2138    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2139        self.inner
2140            .closest_nodes
2141            .borrow()
2142            .iter()
2143            .filter(|(_, node)| !node.queried)
2144            .map(|(_, n)| n.record)
2145            .take(num)
2146            .collect()
2147    }
2148
2149    /// Returns the closest nodes that have not been queried yet.
2150    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2151    where
2152        P: FnMut(&NodeRecord) -> bool,
2153    {
2154        self.inner
2155            .closest_nodes
2156            .borrow()
2157            .iter()
2158            .filter(|(_, node)| !node.queried)
2159            .map(|(_, n)| n.record)
2160            .filter(filter)
2161            .take(num)
2162            .collect()
2163    }
2164
2165    /// Inserts the node if it's missing
2166    fn add_node(&self, record: NodeRecord) {
2167        let distance = self.inner.target.distance(&kad_key(record.id));
2168        let mut closest = self.inner.closest_nodes.borrow_mut();
2169        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2170            entry.insert(QueryNode { record, queried: false, responded: false });
2171        }
2172    }
2173
2174    fn set_queried(&self, id: PeerId, val: bool) {
2175        if let Some((_, node)) =
2176            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2177        {
2178            node.queried = val;
2179        }
2180    }
2181
2182    /// Marks the node as queried
2183    fn mark_queried(&self, id: PeerId) {
2184        self.set_queried(id, true)
2185    }
2186
2187    /// Marks the node as not queried
2188    fn unmark_queried(&self, id: PeerId) {
2189        self.set_queried(id, false)
2190    }
2191
2192    /// Marks the node as responded
2193    fn mark_responded(&self, id: PeerId) {
2194        if let Some((_, node)) =
2195            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2196        {
2197            node.responded = true;
2198        }
2199    }
2200}
2201
2202// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2203// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2204// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2205// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2206// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2207// [`LookupContext`].
2208unsafe impl Send for LookupContext {}
2209#[derive(Debug)]
2210struct LookupContextInner {
2211    /// The target to lookup.
2212    target: discv5::Key<NodeKey>,
2213    /// The closest nodes
2214    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2215    /// A listener for all the nodes retrieved in this lookup
2216    ///
2217    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2218    /// the nodes once the lookup finishes.
2219    listener: Option<NodeRecordSender>,
2220}
2221
2222impl Drop for LookupContextInner {
2223    fn drop(&mut self) {
2224        if let Some(tx) = self.listener.take() {
2225            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2226            // all requests finished, and we can send all results back
2227            let nodes = self
2228                .closest_nodes
2229                .take()
2230                .into_values()
2231                .filter(|node| node.responded)
2232                .map(|node| node.record)
2233                .collect();
2234            let _ = tx.send(nodes);
2235        }
2236    }
2237}
2238
2239/// Tracks the state of a recursive lookup step
2240#[derive(Debug, Clone, Copy)]
2241struct QueryNode {
2242    record: NodeRecord,
2243    queried: bool,
2244    responded: bool,
2245}
2246
2247#[derive(Debug)]
2248struct FindNodeRequest {
2249    // Timestamp when the request was sent.
2250    sent_at: Instant,
2251    // Number of items sent by the node
2252    response_count: usize,
2253    // Whether the request has been answered yet.
2254    answered: bool,
2255    /// Response buffer
2256    lookup_context: LookupContext,
2257}
2258
2259// === impl FindNodeRequest ===
2260
2261impl FindNodeRequest {
2262    fn new(resp: LookupContext) -> Self {
2263        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2264    }
2265}
2266
2267#[derive(Debug)]
2268struct EnrRequestState {
2269    // Timestamp when the request was sent.
2270    sent_at: Instant,
2271    // Hash sent in the Ping request
2272    echo_hash: B256,
2273}
2274
2275/// Stored node info.
2276#[derive(Debug, Clone, Eq, PartialEq)]
2277struct NodeEntry {
2278    /// Node record info.
2279    record: NodeRecord,
2280    /// Timestamp of last pong.
2281    last_seen: Instant,
2282    /// Last enr seq we retrieved via a ENR request.
2283    last_enr_seq: Option<u64>,
2284    /// `ForkId` if retrieved via ENR requests.
2285    fork_id: Option<ForkId>,
2286    /// Counter for failed _consecutive_ findNode requests.
2287    find_node_failures: u8,
2288    /// Whether the endpoint of the peer is proven.
2289    has_endpoint_proof: bool,
2290}
2291
2292// === impl NodeEntry ===
2293
2294impl NodeEntry {
2295    /// Creates a new, unpopulated entry
2296    fn new(record: NodeRecord) -> Self {
2297        Self {
2298            record,
2299            last_seen: Instant::now(),
2300            last_enr_seq: None,
2301            fork_id: None,
2302            find_node_failures: 0,
2303            has_endpoint_proof: false,
2304        }
2305    }
2306
2307    #[cfg(test)]
2308    fn new_proven(record: NodeRecord) -> Self {
2309        let mut node = Self::new(record);
2310        node.has_endpoint_proof = true;
2311        node
2312    }
2313
2314    /// Marks the entry with an established proof and resets the consecutive failure counter.
2315    fn establish_proof(&mut self) {
2316        self.has_endpoint_proof = true;
2317        self.find_node_failures = 0;
2318    }
2319
2320    /// Returns true if the tracked find node failures exceed the max amount
2321    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2322        self.find_node_failures >= max_failures
2323    }
2324
2325    /// Updates the last timestamp and sets the enr seq
2326    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2327        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2328    }
2329
2330    /// Increases the failed request counter
2331    fn inc_failed_request(&mut self) {
2332        self.find_node_failures += 1;
2333    }
2334
2335    /// Updates the last timestamp and sets the enr seq
2336    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2337        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2338    }
2339
2340    /// Updates the `last_seen` timestamp and calls the closure
2341    fn update_now<F, R>(&mut self, f: F) -> R
2342    where
2343        F: FnOnce(&mut Self) -> R,
2344    {
2345        self.last_seen = Instant::now();
2346        f(self)
2347    }
2348}
2349
2350// === impl NodeEntry ===
2351
2352impl NodeEntry {
2353    /// Returns true if the node should be re-pinged.
2354    fn is_expired(&self) -> bool {
2355        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2356    }
2357}
2358
2359/// Represents why a ping is issued
2360#[derive(Debug)]
2361enum PingReason {
2362    /// Initial ping to a previously unknown peer that was inserted into the table.
2363    InitialInsert,
2364    /// A ping to a peer to establish a bond (endpoint proof).
2365    EstablishBond,
2366    /// Re-ping a peer.
2367    RePing,
2368    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2369    Lookup(NodeRecord, LookupContext),
2370}
2371
2372/// Represents node related updates state changes in the underlying node table
2373#[derive(Debug, Clone)]
2374pub enum DiscoveryUpdate {
2375    /// A new node was discovered _and_ added to the table.
2376    Added(NodeRecord),
2377    /// A new node was discovered but _not_ added to the table because it is currently full.
2378    DiscoveredAtCapacity(NodeRecord),
2379    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2380    EnrForkId(NodeRecord, ForkId),
2381    /// Node that was removed from the table
2382    Removed(PeerId),
2383    /// A series of updates
2384    Batch(Vec<DiscoveryUpdate>),
2385}
2386
2387#[cfg(test)]
2388mod tests {
2389    use super::*;
2390    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2391    use alloy_primitives::hex;
2392    use alloy_rlp::{Decodable, Encodable};
2393    use rand::{thread_rng, Rng};
2394    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2395    use reth_network_peers::mainnet_nodes;
2396    use std::future::poll_fn;
2397
2398    #[tokio::test]
2399    async fn test_configured_enr_forkid_entry() {
2400        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2401        let mut disc_conf = Discv4Config::default();
2402        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2403        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2404        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2405        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2406
2407        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2408        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2409        let expected = EnrForkIdEntry {
2410            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2411        };
2412        assert_eq!(expected, fork_entry_id);
2413        assert_eq!(expected, decoded);
2414    }
2415
2416    #[test]
2417    fn test_enr_forkid_entry_decode() {
2418        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2419        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2420        let expected = EnrForkIdEntry {
2421            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2422        };
2423        assert_eq!(expected, decoded);
2424    }
2425
2426    #[test]
2427    fn test_enr_forkid_entry_encode() {
2428        let original = EnrForkIdEntry {
2429            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2430        };
2431        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2432        let mut encoded = Vec::with_capacity(expected.len());
2433        original.encode(&mut encoded);
2434        assert_eq!(&expected[..], encoded.as_slice());
2435    }
2436
2437    #[test]
2438    fn test_local_rotator() {
2439        let id = PeerId::random();
2440        let mut rotator = LookupTargetRotator::local_only();
2441        assert_eq!(rotator.next(&id), id);
2442        assert_eq!(rotator.next(&id), id);
2443    }
2444
2445    #[test]
2446    fn test_rotator() {
2447        let id = PeerId::random();
2448        let mut rotator = LookupTargetRotator::default();
2449        assert_eq!(rotator.next(&id), id);
2450        assert_ne!(rotator.next(&id), id);
2451        assert_ne!(rotator.next(&id), id);
2452        assert_ne!(rotator.next(&id), id);
2453        assert_eq!(rotator.next(&id), id);
2454    }
2455
2456    #[tokio::test]
2457    async fn test_pending_ping() {
2458        let (_, mut service) = create_discv4().await;
2459
2460        let local_addr = service.local_addr();
2461
2462        let mut num_inserted = 0;
2463        loop {
2464            let node = NodeRecord::new(local_addr, PeerId::random());
2465            if service.add_node(node) {
2466                num_inserted += 1;
2467                assert!(service.pending_pings.contains_key(&node.id));
2468                assert_eq!(service.pending_pings.len(), num_inserted);
2469                if num_inserted == MAX_NODES_PING {
2470                    break
2471                }
2472            }
2473        }
2474
2475        // `pending_pings` is full, insert into `queued_pings`.
2476        num_inserted = 0;
2477        for _ in 0..MAX_NODES_PING {
2478            let node = NodeRecord::new(local_addr, PeerId::random());
2479            if service.add_node(node) {
2480                num_inserted += 1;
2481                assert!(!service.pending_pings.contains_key(&node.id));
2482                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2483                assert_eq!(service.queued_pings.len(), num_inserted);
2484            }
2485        }
2486    }
2487
2488    // Bootstraps with mainnet boot nodes
2489    #[tokio::test(flavor = "multi_thread")]
2490    #[ignore]
2491    async fn test_mainnet_lookup() {
2492        reth_tracing::init_test_tracing();
2493        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2494
2495        let all_nodes = mainnet_nodes();
2496        let config = Discv4Config::builder()
2497            .add_boot_nodes(all_nodes)
2498            .lookup_interval(Duration::from_secs(1))
2499            .add_eip868_pair("eth", fork_id)
2500            .build();
2501        let (_discv4, mut service) = create_discv4_with_config(config).await;
2502
2503        let mut updates = service.update_stream();
2504
2505        let _handle = service.spawn();
2506
2507        let mut table = HashMap::new();
2508        while let Some(update) = updates.next().await {
2509            match update {
2510                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2511                    println!("{record:?}, {fork_id:?}");
2512                }
2513                DiscoveryUpdate::Added(record) => {
2514                    table.insert(record.id, record);
2515                }
2516                DiscoveryUpdate::Removed(id) => {
2517                    table.remove(&id);
2518                }
2519                _ => {}
2520            }
2521            println!("total peers {}", table.len());
2522        }
2523    }
2524
2525    #[tokio::test]
2526    async fn test_mapped_ipv4() {
2527        reth_tracing::init_test_tracing();
2528        let mut rng = thread_rng();
2529        let config = Discv4Config::builder().build();
2530        let (_discv4, mut service) = create_discv4_with_config(config).await;
2531
2532        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2533        let v6 = v4.to_ipv6_mapped();
2534        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2535
2536        let ping = Ping {
2537            from: rng_endpoint(&mut rng),
2538            to: rng_endpoint(&mut rng),
2539            expire: service.ping_expiration(),
2540            enr_sq: Some(rng.gen()),
2541        };
2542
2543        let id = PeerId::random_with(&mut rng);
2544        service.on_ping(ping, addr, id, rng.gen());
2545
2546        let key = kad_key(id);
2547        match service.kbuckets.entry(&key) {
2548            kbucket::Entry::Present(entry, _) => {
2549                let node_addr = entry.value().record.address;
2550                assert!(node_addr.is_ipv4());
2551                assert_eq!(node_addr, IpAddr::from(v4));
2552            }
2553            _ => unreachable!(),
2554        };
2555    }
2556
2557    #[tokio::test]
2558    async fn test_respect_ping_expiration() {
2559        reth_tracing::init_test_tracing();
2560        let mut rng = thread_rng();
2561        let config = Discv4Config::builder().build();
2562        let (_discv4, mut service) = create_discv4_with_config(config).await;
2563
2564        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2565        let v6 = v4.to_ipv6_mapped();
2566        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2567
2568        let ping = Ping {
2569            from: rng_endpoint(&mut rng),
2570            to: rng_endpoint(&mut rng),
2571            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2572            enr_sq: Some(rng.gen()),
2573        };
2574
2575        let id = PeerId::random_with(&mut rng);
2576        service.on_ping(ping, addr, id, rng.gen());
2577
2578        let key = kad_key(id);
2579        match service.kbuckets.entry(&key) {
2580            kbucket::Entry::Absent(_) => {}
2581            _ => unreachable!(),
2582        };
2583    }
2584
2585    #[tokio::test]
2586    async fn test_single_lookups() {
2587        reth_tracing::init_test_tracing();
2588
2589        let config = Discv4Config::builder().build();
2590        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2591
2592        let id = PeerId::random();
2593        let key = kad_key(id);
2594        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2595
2596        let _ = service.kbuckets.insert_or_update(
2597            &key,
2598            NodeEntry::new_proven(record),
2599            NodeStatus {
2600                direction: ConnectionDirection::Incoming,
2601                state: ConnectionState::Connected,
2602            },
2603        );
2604
2605        service.lookup_self();
2606        assert_eq!(service.pending_find_nodes.len(), 1);
2607
2608        poll_fn(|cx| {
2609            let _ = service.poll(cx);
2610            assert_eq!(service.pending_find_nodes.len(), 1);
2611
2612            Poll::Ready(())
2613        })
2614        .await;
2615    }
2616
2617    #[tokio::test]
2618    async fn test_on_neighbours_recursive_lookup() {
2619        reth_tracing::init_test_tracing();
2620
2621        let config = Discv4Config::builder().build();
2622        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2623        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2624
2625        let id = PeerId::random();
2626        let key = kad_key(id);
2627        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2628
2629        let _ = service.kbuckets.insert_or_update(
2630            &key,
2631            NodeEntry::new_proven(record),
2632            NodeStatus {
2633                direction: ConnectionDirection::Incoming,
2634                state: ConnectionState::Connected,
2635            },
2636        );
2637        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2638        // on_neighbours request
2639        service.lookup_self();
2640        assert_eq!(service.pending_find_nodes.len(), 1);
2641
2642        poll_fn(|cx| {
2643            let _ = service.poll(cx);
2644            assert_eq!(service.pending_find_nodes.len(), 1);
2645
2646            Poll::Ready(())
2647        })
2648        .await;
2649
2650        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2651            10000000000000;
2652        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2653        service.on_neighbours(msg, record.tcp_addr(), id);
2654        // wait for the processed ping
2655        let event = poll_fn(|cx| service2.poll(cx)).await;
2656        assert_eq!(event, Discv4Event::Ping);
2657        // assert that no find_node req has been added here on top of the initial one, since both
2658        // sides of the endpoint proof is not completed here
2659        assert_eq!(service.pending_find_nodes.len(), 1);
2660        // we now wait for PONG
2661        let event = poll_fn(|cx| service.poll(cx)).await;
2662        assert_eq!(event, Discv4Event::Pong);
2663        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2664        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2665        // drained almost immediately - and no way to grab the handle to its intermediary state here
2666        // :(
2667        let event = poll_fn(|cx| service.poll(cx)).await;
2668        assert_eq!(event, Discv4Event::Ping);
2669        // assert that we've added the find_node req here after both sides of the endpoint proof is
2670        // done
2671        assert_eq!(service.pending_find_nodes.len(), 2);
2672    }
2673
2674    #[tokio::test]
2675    async fn test_no_local_in_closest() {
2676        reth_tracing::init_test_tracing();
2677
2678        let config = Discv4Config::builder().build();
2679        let (_discv4, mut service) = create_discv4_with_config(config).await;
2680
2681        let target_key = kad_key(PeerId::random());
2682
2683        let id = PeerId::random();
2684        let key = kad_key(id);
2685        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2686
2687        let _ = service.kbuckets.insert_or_update(
2688            &key,
2689            NodeEntry::new(record),
2690            NodeStatus {
2691                direction: ConnectionDirection::Incoming,
2692                state: ConnectionState::Connected,
2693            },
2694        );
2695
2696        let closest = service
2697            .kbuckets
2698            .closest_values(&target_key)
2699            .map(|n| n.value.record)
2700            .take(MAX_NODES_PER_BUCKET)
2701            .collect::<Vec<_>>();
2702
2703        assert_eq!(closest.len(), 1);
2704        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2705    }
2706
2707    #[tokio::test]
2708    async fn test_random_lookup() {
2709        reth_tracing::init_test_tracing();
2710
2711        let config = Discv4Config::builder().build();
2712        let (_discv4, mut service) = create_discv4_with_config(config).await;
2713
2714        let target = PeerId::random();
2715
2716        let id = PeerId::random();
2717        let key = kad_key(id);
2718        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2719
2720        let _ = service.kbuckets.insert_or_update(
2721            &key,
2722            NodeEntry::new_proven(record),
2723            NodeStatus {
2724                direction: ConnectionDirection::Incoming,
2725                state: ConnectionState::Connected,
2726            },
2727        );
2728
2729        service.lookup(target);
2730        assert_eq!(service.pending_find_nodes.len(), 1);
2731
2732        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2733
2734        assert_eq!(ctx.target(), target);
2735        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2736
2737        ctx.add_node(record);
2738        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2739    }
2740
2741    #[tokio::test]
2742    async fn test_reping_on_find_node_failures() {
2743        reth_tracing::init_test_tracing();
2744
2745        let config = Discv4Config::builder().build();
2746        let (_discv4, mut service) = create_discv4_with_config(config).await;
2747
2748        let target = PeerId::random();
2749
2750        let id = PeerId::random();
2751        let key = kad_key(id);
2752        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2753
2754        let mut entry = NodeEntry::new_proven(record);
2755        entry.find_node_failures = u8::MAX;
2756        let _ = service.kbuckets.insert_or_update(
2757            &key,
2758            entry,
2759            NodeStatus {
2760                direction: ConnectionDirection::Incoming,
2761                state: ConnectionState::Connected,
2762            },
2763        );
2764
2765        service.lookup(target);
2766        assert_eq!(service.pending_find_nodes.len(), 0);
2767        assert_eq!(service.pending_pings.len(), 1);
2768
2769        service.update_on_pong(record, None);
2770
2771        service
2772            .on_entry(record.id, |entry| {
2773                // reset on pong
2774                assert_eq!(entry.find_node_failures, 0);
2775                assert!(entry.has_endpoint_proof);
2776            })
2777            .unwrap();
2778    }
2779
2780    #[tokio::test]
2781    async fn test_service_commands() {
2782        reth_tracing::init_test_tracing();
2783
2784        let config = Discv4Config::builder().build();
2785        let (discv4, mut service) = create_discv4_with_config(config).await;
2786
2787        service.lookup_self();
2788
2789        let _handle = service.spawn();
2790        discv4.send_lookup_self();
2791        let _ = discv4.lookup_self().await;
2792    }
2793
2794    #[tokio::test]
2795    async fn test_requests_timeout() {
2796        reth_tracing::init_test_tracing();
2797        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2798
2799        let config = Discv4Config::builder()
2800            .request_timeout(Duration::from_millis(200))
2801            .ping_expiration(Duration::from_millis(200))
2802            .lookup_neighbours_expiration(Duration::from_millis(200))
2803            .add_eip868_pair("eth", fork_id)
2804            .build();
2805        let (_disv4, mut service) = create_discv4_with_config(config).await;
2806
2807        let id = PeerId::random();
2808        let key = kad_key(id);
2809        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2810
2811        let _ = service.kbuckets.insert_or_update(
2812            &key,
2813            NodeEntry::new_proven(record),
2814            NodeStatus {
2815                direction: ConnectionDirection::Incoming,
2816                state: ConnectionState::Connected,
2817            },
2818        );
2819
2820        service.lookup_self();
2821        assert_eq!(service.pending_find_nodes.len(), 1);
2822
2823        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2824
2825        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2826
2827        assert_eq!(service.pending_lookup.len(), 1);
2828
2829        let ping = Ping {
2830            from: service.local_node_record.into(),
2831            to: record.into(),
2832            expire: service.ping_expiration(),
2833            enr_sq: service.enr_seq(),
2834        };
2835        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2836        let ping_request = PingRequest {
2837            sent_at: Instant::now(),
2838            node: record,
2839            echo_hash,
2840            reason: PingReason::InitialInsert,
2841        };
2842        service.pending_pings.insert(record.id, ping_request);
2843
2844        assert_eq!(service.pending_pings.len(), 1);
2845
2846        tokio::time::sleep(Duration::from_secs(1)).await;
2847
2848        poll_fn(|cx| {
2849            let _ = service.poll(cx);
2850
2851            assert_eq!(service.pending_find_nodes.len(), 0);
2852            assert_eq!(service.pending_lookup.len(), 0);
2853            assert_eq!(service.pending_pings.len(), 0);
2854
2855            Poll::Ready(())
2856        })
2857        .await;
2858    }
2859
2860    // sends a PING packet with wrong 'to' field and expects a PONG response.
2861    #[tokio::test(flavor = "multi_thread")]
2862    async fn test_check_wrong_to() {
2863        reth_tracing::init_test_tracing();
2864
2865        let config = Discv4Config::builder().external_ip_resolver(None).build();
2866        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2867        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2868
2869        // ping node 2 with wrong to field
2870        let mut ping = Ping {
2871            from: service_1.local_node_record.into(),
2872            to: service_2.local_node_record.into(),
2873            expire: service_1.ping_expiration(),
2874            enr_sq: service_1.enr_seq(),
2875        };
2876        ping.to.address = "192.0.2.0".parse().unwrap();
2877
2878        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2879        let ping_request = PingRequest {
2880            sent_at: Instant::now(),
2881            node: service_2.local_node_record,
2882            echo_hash,
2883            reason: PingReason::InitialInsert,
2884        };
2885        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2886
2887        // wait for the processed ping
2888        let event = poll_fn(|cx| service_2.poll(cx)).await;
2889        assert_eq!(event, Discv4Event::Ping);
2890
2891        // we now wait for PONG
2892        let event = poll_fn(|cx| service_1.poll(cx)).await;
2893        assert_eq!(event, Discv4Event::Pong);
2894        // followed by a ping
2895        let event = poll_fn(|cx| service_1.poll(cx)).await;
2896        assert_eq!(event, Discv4Event::Ping);
2897    }
2898
2899    #[tokio::test(flavor = "multi_thread")]
2900    async fn test_check_ping_pong() {
2901        reth_tracing::init_test_tracing();
2902
2903        let config = Discv4Config::builder().external_ip_resolver(None).build();
2904        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2905        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2906
2907        // send ping from 1 -> 2
2908        service_1.add_node(service_2.local_node_record);
2909
2910        // wait for the processed ping
2911        let event = poll_fn(|cx| service_2.poll(cx)).await;
2912        assert_eq!(event, Discv4Event::Ping);
2913
2914        // node is now in the table but not connected yet
2915        let key1 = kad_key(*service_1.local_peer_id());
2916        match service_2.kbuckets.entry(&key1) {
2917            kbucket::Entry::Present(_entry, status) => {
2918                assert!(!status.is_connected());
2919            }
2920            _ => unreachable!(),
2921        }
2922
2923        // we now wait for PONG
2924        let event = poll_fn(|cx| service_1.poll(cx)).await;
2925        assert_eq!(event, Discv4Event::Pong);
2926
2927        // endpoint is proven
2928        let key2 = kad_key(*service_2.local_peer_id());
2929        match service_1.kbuckets.entry(&key2) {
2930            kbucket::Entry::Present(_entry, status) => {
2931                assert!(status.is_connected());
2932            }
2933            _ => unreachable!(),
2934        }
2935
2936        // we now wait for the PING initiated by 2
2937        let event = poll_fn(|cx| service_1.poll(cx)).await;
2938        assert_eq!(event, Discv4Event::Ping);
2939
2940        // we now wait for PONG
2941        let event = poll_fn(|cx| service_2.poll(cx)).await;
2942
2943        match event {
2944            Discv4Event::EnrRequest => {
2945                // since we support enr in the ping it may also request the enr
2946                let event = poll_fn(|cx| service_2.poll(cx)).await;
2947                match event {
2948                    Discv4Event::EnrRequest => {
2949                        let event = poll_fn(|cx| service_2.poll(cx)).await;
2950                        assert_eq!(event, Discv4Event::Pong);
2951                    }
2952                    Discv4Event::Pong => {}
2953                    _ => {
2954                        unreachable!()
2955                    }
2956                }
2957            }
2958            Discv4Event::Pong => {}
2959            ev => unreachable!("{ev:?}"),
2960        }
2961
2962        // endpoint is proven
2963        match service_2.kbuckets.entry(&key1) {
2964            kbucket::Entry::Present(_entry, status) => {
2965                assert!(status.is_connected());
2966            }
2967            ev => unreachable!("{ev:?}"),
2968        }
2969    }
2970
2971    #[test]
2972    fn test_insert() {
2973        let local_node_record = rng_record(&mut rand::thread_rng());
2974        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2975            NodeKey::from(&local_node_record).into(),
2976            Duration::from_secs(60),
2977            MAX_NODES_PER_BUCKET,
2978            None,
2979            None,
2980        );
2981
2982        let new_record = rng_record(&mut rand::thread_rng());
2983        let key = kad_key(new_record.id);
2984        match kbuckets.entry(&key) {
2985            kbucket::Entry::Absent(entry) => {
2986                let node = NodeEntry::new(new_record);
2987                let _ = entry.insert(
2988                    node,
2989                    NodeStatus {
2990                        direction: ConnectionDirection::Outgoing,
2991                        state: ConnectionState::Disconnected,
2992                    },
2993                );
2994            }
2995            _ => {
2996                unreachable!()
2997            }
2998        };
2999        match kbuckets.entry(&key) {
3000            kbucket::Entry::Present(_, _) => {}
3001            _ => {
3002                unreachable!()
3003            }
3004        }
3005    }
3006}