1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
166pub struct Discv4 {
167 local_addr: SocketAddr,
169 to_service: mpsc::UnboundedSender<Discv4Command>,
171 node_record: Arc<Mutex<NodeRecord>>,
175}
176
177impl Discv4 {
180 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
247 local_address: SocketAddr,
248 mut local_node_record: NodeRecord,
249 secret_key: SecretKey,
250 config: Discv4Config,
251 ) -> io::Result<(Self, Discv4Service)> {
252 let socket = UdpSocket::bind(local_address).await?;
253 let local_addr = socket.local_addr()?;
254 local_node_record.udp_port = local_addr.port();
255 trace!(target: "discv4", ?local_addr,"opened UDP socket");
256
257 let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
258 let discv4 = service.handle();
259 Ok((discv4, service))
260 }
261
262 pub const fn local_addr(&self) -> SocketAddr {
264 self.local_addr
265 }
266
267 pub fn node_record(&self) -> NodeRecord {
271 *self.node_record.lock()
272 }
273
274 pub fn external_ip(&self) -> IpAddr {
276 self.node_record.lock().address
277 }
278
279 pub fn set_lookup_interval(&self, duration: Duration) {
281 self.send_to_service(Discv4Command::SetLookupInterval(duration))
282 }
283
284 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
299 self.lookup_node(None).await
300 }
301
302 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
306 self.lookup_node(Some(node_id)).await
307 }
308
309 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
311 let target = PeerId::random();
312 self.lookup_node(Some(target)).await
313 }
314
315 pub fn send_lookup(&self, node_id: PeerId) {
317 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
318 self.send_to_service(cmd);
319 }
320
321 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
322 let (tx, rx) = oneshot::channel();
323 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
324 self.to_service.send(cmd)?;
325 Ok(rx.await?)
326 }
327
328 pub fn send_lookup_self(&self) {
330 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
331 self.send_to_service(cmd);
332 }
333
334 pub fn remove_peer(&self, node_id: PeerId) {
336 let cmd = Discv4Command::Remove(node_id);
337 self.send_to_service(cmd);
338 }
339
340 pub fn add_node(&self, node_record: NodeRecord) {
342 let cmd = Discv4Command::Add(node_record);
343 self.send_to_service(cmd);
344 }
345
346 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
350 let cmd = Discv4Command::Ban(node_id, ip);
351 self.send_to_service(cmd);
352 }
353
354 pub fn ban_ip(&self, ip: IpAddr) {
358 let cmd = Discv4Command::BanIp(ip);
359 self.send_to_service(cmd);
360 }
361
362 pub fn ban_node(&self, node_id: PeerId) {
366 let cmd = Discv4Command::BanPeer(node_id);
367 self.send_to_service(cmd);
368 }
369
370 pub fn set_tcp_port(&self, port: u16) {
374 let cmd = Discv4Command::SetTcpPort(port);
375 self.send_to_service(cmd);
376 }
377
378 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
384 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
385 self.send_to_service(cmd);
386 }
387
388 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
392 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
393 }
394
395 #[inline]
396 fn send_to_service(&self, cmd: Discv4Command) {
397 let _ = self.to_service.send(cmd).map_err(|err| {
398 debug!(
399 target: "discv4",
400 %err,
401 "channel capacity reached, dropping command",
402 )
403 });
404 }
405
406 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
408 let (tx, rx) = oneshot::channel();
409 let cmd = Discv4Command::Updates(tx);
410 self.to_service.send(cmd)?;
411 Ok(rx.await?)
412 }
413
414 pub fn terminate(&self) {
416 self.send_to_service(Discv4Command::Terminated);
417 }
418}
419
420#[must_use = "Stream does nothing unless polled"]
425pub struct Discv4Service {
426 local_address: SocketAddr,
428 local_eip_868_enr: Enr<SecretKey>,
430 local_node_record: NodeRecord,
432 shared_node_record: Arc<Mutex<NodeRecord>>,
434 secret_key: SecretKey,
436 _socket: Arc<UdpSocket>,
438 _tasks: JoinSet<()>,
442 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
444 ingress: IngressReceiver,
448 egress: EgressSender,
452 queued_pings: VecDeque<(NodeRecord, PingReason)>,
459 pending_pings: HashMap<PeerId, PingRequest>,
461 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
466 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
468 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
470 to_service: mpsc::UnboundedSender<Discv4Command>,
472 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
474 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
476 lookup_interval: Interval,
478 lookup_rotator: LookupTargetRotator,
480 evict_expired_requests_interval: Interval,
482 ping_interval: Interval,
484 resolve_external_ip_interval: Option<ResolveNatInterval>,
486 config: Discv4Config,
488 queued_events: VecDeque<Discv4Event>,
490 received_pongs: PongTable,
492 expire_interval: Interval,
494}
495
496impl Discv4Service {
497 pub(crate) fn new(
499 socket: UdpSocket,
500 local_address: SocketAddr,
501 local_node_record: NodeRecord,
502 secret_key: SecretKey,
503 config: Discv4Config,
504 ) -> Self {
505 let socket = Arc::new(socket);
506 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
507 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
508 let mut tasks = JoinSet::<()>::new();
509
510 let udp = Arc::clone(&socket);
511 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
512
513 let udp = Arc::clone(&socket);
514 tasks.spawn(send_loop(udp, egress_rx));
515
516 let kbuckets = KBucketsTable::new(
517 NodeKey::from(&local_node_record).into(),
518 Duration::from_secs(60),
519 MAX_NODES_PER_BUCKET,
520 None,
521 None,
522 );
523
524 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
525
526 let ping_interval = tokio::time::interval_at(
529 tokio::time::Instant::now() + config.ping_interval,
530 config.ping_interval,
531 );
532
533 let evict_expired_requests_interval = tokio::time::interval_at(
534 tokio::time::Instant::now() + config.request_timeout,
535 config.request_timeout,
536 );
537
538 let lookup_rotator = if config.enable_dht_random_walk {
539 LookupTargetRotator::default()
540 } else {
541 LookupTargetRotator::local_only()
542 };
543
544 let local_eip_868_enr = {
546 let mut builder = Enr::builder();
547 builder.ip(local_node_record.address);
548 if local_node_record.address.is_ipv4() {
549 builder.udp4(local_node_record.udp_port);
550 builder.tcp4(local_node_record.tcp_port);
551 } else {
552 builder.udp6(local_node_record.udp_port);
553 builder.tcp6(local_node_record.tcp_port);
554 }
555
556 for (key, val) in &config.additional_eip868_rlp_pairs {
557 builder.add_value_rlp(key, val.clone());
558 }
559 builder.build(&secret_key).expect("v4 is set")
560 };
561
562 let (to_service, commands_rx) = mpsc::unbounded_channel();
563
564 let shared_node_record = Arc::new(Mutex::new(local_node_record));
565
566 Self {
567 local_address,
568 local_eip_868_enr,
569 local_node_record,
570 shared_node_record,
571 _socket: socket,
572 kbuckets,
573 secret_key,
574 _tasks: tasks,
575 ingress: ingress_rx,
576 egress: egress_tx,
577 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
578 pending_pings: Default::default(),
579 pending_lookup: Default::default(),
580 pending_find_nodes: Default::default(),
581 pending_enr_requests: Default::default(),
582 commands_rx,
583 to_service,
584 update_listeners: Vec::with_capacity(1),
585 lookup_interval: self_lookup_interval,
586 ping_interval,
587 evict_expired_requests_interval,
588 lookup_rotator,
589 resolve_external_ip_interval: config.resolve_external_ip_interval(),
590 config,
591 queued_events: Default::default(),
592 received_pongs: Default::default(),
593 expire_interval: tokio::time::interval(EXPIRE_DURATION),
594 }
595 }
596
597 pub fn handle(&self) -> Discv4 {
599 Discv4 {
600 local_addr: self.local_address,
601 to_service: self.to_service.clone(),
602 node_record: self.shared_node_record.clone(),
603 }
604 }
605
606 fn enr_seq(&self) -> Option<u64> {
608 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
609 }
610
611 pub fn set_lookup_interval(&mut self, duration: Duration) {
613 self.lookup_interval = tokio::time::interval(duration);
614 }
615
616 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
619 if self.local_node_record.address != external_ip {
620 debug!(target: "discv4", ?external_ip, "Updating external ip");
621 self.local_node_record.address = external_ip;
622 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
623 let mut lock = self.shared_node_record.lock();
624 *lock = self.local_node_record;
625 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
626 }
627 }
628
629 pub const fn local_peer_id(&self) -> &PeerId {
631 &self.local_node_record.id
632 }
633
634 pub const fn local_addr(&self) -> SocketAddr {
636 self.local_address
637 }
638
639 pub const fn local_enr(&self) -> NodeRecord {
643 self.local_node_record
644 }
645
646 #[cfg(test)]
648 pub fn local_enr_mut(&mut self) -> &mut NodeRecord {
649 &mut self.local_node_record
650 }
651
652 pub fn contains_node(&self, id: PeerId) -> bool {
654 let key = kad_key(id);
655 self.kbuckets.get_index(&key).is_some()
656 }
657
658 pub fn bootstrap(&mut self) {
673 for record in self.config.bootstrap_nodes.clone() {
674 debug!(target: "discv4", ?record, "pinging boot node");
675 let key = kad_key(record.id);
676 let entry = NodeEntry::new(record);
677
678 match self.kbuckets.insert_or_update(
680 &key,
681 entry,
682 NodeStatus {
683 state: ConnectionState::Disconnected,
684 direction: ConnectionDirection::Outgoing,
685 },
686 ) {
687 InsertResult::Failed(_) => {}
688 _ => {
689 self.try_ping(record, PingReason::InitialInsert);
690 }
691 }
692 }
693 }
694
695 pub fn spawn(mut self) -> JoinHandle<()> {
699 tokio::task::spawn(async move {
700 self.bootstrap();
701
702 while let Some(event) = self.next().await {
703 trace!(target: "discv4", ?event, "processed");
704 }
705 trace!(target: "discv4", "service terminated");
706 })
707 }
708
709 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
711 let (tx, rx) = mpsc::channel(512);
712 self.update_listeners.push(tx);
713 ReceiverStream::new(rx)
714 }
715
716 pub fn lookup_self(&mut self) {
718 self.lookup(self.local_node_record.id)
719 }
720
721 pub fn lookup(&mut self, target: PeerId) {
731 self.lookup_with(target, None)
732 }
733
734 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
744 trace!(target: "discv4", ?target, "Starting lookup");
745 let target_key = kad_key(target);
746
747 let ctx = LookupContext::new(
750 target_key.clone(),
751 self.kbuckets
752 .closest_values(&target_key)
753 .filter(|node| {
754 node.value.has_endpoint_proof &&
755 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
756 })
757 .take(MAX_NODES_PER_BUCKET)
758 .map(|n| (target_key.distance(&n.key), n.value.record)),
759 tx,
760 );
761
762 let closest = ctx.closest(ALPHA);
764
765 if closest.is_empty() && self.pending_find_nodes.is_empty() {
766 self.bootstrap();
771 return
772 }
773
774 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
775
776 for node in closest {
777 self.find_node_checked(&node, ctx.clone());
781 }
782 }
783
784 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
788 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
789 ctx.mark_queried(node.id);
790 let id = ctx.target();
791 let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
792 self.send_packet(msg, node.udp_addr());
793 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
794 }
795
796 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
801 let max_failures = self.config.max_find_node_failures;
802 let needs_ping = self
803 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
804 .unwrap_or(true);
805 if needs_ping {
806 self.try_ping(*node, PingReason::Lookup(*node, ctx))
807 } else {
808 self.find_node(node, ctx)
809 }
810 }
811
812 fn notify(&mut self, update: DiscoveryUpdate) {
816 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
817 Ok(()) => true,
818 Err(err) => match err {
819 TrySendError::Full(_) => true,
820 TrySendError::Closed(_) => false,
821 },
822 });
823 }
824
825 pub fn ban_ip(&mut self, ip: IpAddr) {
827 self.config.ban_list.ban_ip(ip);
828 }
829
830 pub fn ban_node(&mut self, node_id: PeerId) {
832 self.remove_node(node_id);
833 self.config.ban_list.ban_peer(node_id);
834 }
835
836 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
838 self.config.ban_list.ban_ip_until(ip, until);
839 }
840
841 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
843 self.remove_node(node_id);
844 self.config.ban_list.ban_peer_until(node_id, until);
845 }
846
847 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
852 let key = kad_key(node_id);
853 self.remove_key(node_id, key)
854 }
855
856 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
861 let key = kad_key(node_id);
862 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
863 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
864 return false
866 }
867 self.remove_key(node_id, key)
868 }
869
870 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
871 let removed = self.kbuckets.remove(&key);
872 if removed {
873 trace!(target: "discv4", ?node_id, "removed node");
874 self.notify(DiscoveryUpdate::Removed(node_id));
875 }
876 removed
877 }
878
879 pub fn num_connected(&self) -> usize {
881 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
882 }
883
884 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
886 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
887 if timestamp.elapsed() < self.config.bond_expiration {
888 return true
889 }
890 }
891 false
892 }
893
894 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
896 where
897 F: FnOnce(&NodeEntry) -> R,
898 {
899 let key = kad_key(peer_id);
900 match self.kbuckets.entry(&key) {
901 BucketEntry::Present(entry, _) => Some(f(entry.value())),
902 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
903 _ => None,
904 }
905 }
906
907 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
914 if record.id == self.local_node_record.id {
915 return
916 }
917
918 if !self.config.enable_eip868 {
920 last_enr_seq = None;
921 }
922
923 let key = kad_key(record.id);
924 let old_enr = match self.kbuckets.entry(&key) {
925 kbucket::Entry::Present(mut entry, _) => {
926 entry.value_mut().update_with_enr(last_enr_seq)
927 }
928 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
929 _ => return,
930 };
931
932 match (last_enr_seq, old_enr) {
934 (Some(new), Some(old)) => {
935 if new > old {
936 self.send_enr_request(record);
937 }
938 }
939 (Some(_), None) => {
940 self.send_enr_request(record);
942 }
943 _ => {}
944 };
945 }
946
947 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
949 if record.id == *self.local_peer_id() {
950 return
951 }
952
953 if !self.config.enable_eip868 {
955 last_enr_seq = None;
956 }
957
958 let has_enr_seq = last_enr_seq.is_some();
961
962 let key = kad_key(record.id);
963 match self.kbuckets.entry(&key) {
964 kbucket::Entry::Present(mut entry, old_status) => {
965 entry.value_mut().establish_proof();
967 entry.value_mut().update_with_enr(last_enr_seq);
968
969 if !old_status.is_connected() {
970 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
971 trace!(target: "discv4", ?record, "added after successful endpoint proof");
972 self.notify(DiscoveryUpdate::Added(record));
973
974 if has_enr_seq {
975 self.send_enr_request(record);
977 }
978 }
979 }
980 kbucket::Entry::Pending(mut entry, mut status) => {
981 entry.value().establish_proof();
983 entry.value().update_with_enr(last_enr_seq);
984
985 if !status.is_connected() {
986 status.state = ConnectionState::Connected;
987 let _ = entry.update(status);
988 trace!(target: "discv4", ?record, "added after successful endpoint proof");
989 self.notify(DiscoveryUpdate::Added(record));
990
991 if has_enr_seq {
992 self.send_enr_request(record);
994 }
995 }
996 }
997 _ => {}
998 };
999 }
1000
1001 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1005 for record in records {
1006 self.add_node(record);
1007 }
1008 }
1009
1010 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1016 let key = kad_key(record.id);
1017 match self.kbuckets.entry(&key) {
1018 kbucket::Entry::Absent(entry) => {
1019 let node = NodeEntry::new(record);
1020 match entry.insert(
1021 node,
1022 NodeStatus {
1023 direction: ConnectionDirection::Outgoing,
1024 state: ConnectionState::Disconnected,
1025 },
1026 ) {
1027 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1028 trace!(target: "discv4", ?record, "inserted new record");
1029 }
1030 _ => return false,
1031 }
1032 }
1033 _ => return false,
1034 }
1035
1036 self.try_ping(record, PingReason::InitialInsert);
1038 true
1039 }
1040
1041 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1043 let (payload, hash) = msg.encode(&self.secret_key);
1044 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1045 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1046 debug!(
1047 target: "discv4",
1048 %err,
1049 "dropped outgoing packet",
1050 );
1051 });
1052 hash
1053 }
1054
1055 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1057 if self.is_expired(ping.expire) {
1058 return
1060 }
1061
1062 let record = NodeRecord {
1064 address: remote_addr.ip(),
1065 udp_port: remote_addr.port(),
1066 tcp_port: ping.from.tcp_port,
1067 id: remote_id,
1068 }
1069 .into_ipv4_mapped();
1070
1071 let key = kad_key(record.id);
1072
1073 let mut is_new_insert = false;
1080 let mut needs_bond = false;
1081 let mut is_proven = false;
1082
1083 let old_enr = match self.kbuckets.entry(&key) {
1084 kbucket::Entry::Present(mut entry, _) => {
1085 if entry.value().is_expired() {
1086 needs_bond = true;
1089 } else {
1090 is_proven = entry.value().has_endpoint_proof;
1091 }
1092 entry.value_mut().update_with_enr(ping.enr_sq)
1093 }
1094 kbucket::Entry::Pending(mut entry, _) => {
1095 if entry.value().is_expired() {
1096 needs_bond = true;
1099 } else {
1100 is_proven = entry.value().has_endpoint_proof;
1101 }
1102 entry.value().update_with_enr(ping.enr_sq)
1103 }
1104 kbucket::Entry::Absent(entry) => {
1105 let mut node = NodeEntry::new(record);
1106 node.last_enr_seq = ping.enr_sq;
1107
1108 match entry.insert(
1109 node,
1110 NodeStatus {
1111 direction: ConnectionDirection::Incoming,
1112 state: ConnectionState::Disconnected,
1114 },
1115 ) {
1116 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1117 is_new_insert = true;
1119 }
1120 BucketInsertResult::Full => {
1121 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1125 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1126 needs_bond = true;
1127 }
1128 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1129 needs_bond = true;
1130 }
1132 BucketInsertResult::FailedFilter => return,
1133 }
1134
1135 None
1136 }
1137 kbucket::Entry::SelfEntry => return,
1138 };
1139
1140 let pong = Message::Pong(Pong {
1143 to: record.into(),
1145 echo: hash,
1146 expire: ping.expire,
1147 enr_sq: self.enr_seq(),
1148 });
1149 self.send_packet(pong, remote_addr);
1150
1151 if is_new_insert {
1153 self.try_ping(record, PingReason::InitialInsert);
1154 } else if needs_bond {
1155 self.try_ping(record, PingReason::EstablishBond);
1156 } else if is_proven {
1157 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1161 if self.pending_find_nodes.contains_key(&record.id) {
1162 ctx.unmark_queried(record.id);
1165 } else {
1166 self.find_node(&record, ctx);
1169 }
1170 }
1171 } else {
1172 match (ping.enr_sq, old_enr) {
1174 (Some(new), Some(old)) => {
1175 if new > old {
1176 self.send_enr_request(record);
1177 }
1178 }
1179 (Some(_), None) => {
1180 self.send_enr_request(record);
1181 }
1182 _ => {}
1183 };
1184 }
1185 }
1186
1187 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1189 if node.id == *self.local_peer_id() {
1190 return
1192 }
1193
1194 if self.pending_pings.contains_key(&node.id) ||
1195 self.pending_find_nodes.contains_key(&node.id)
1196 {
1197 return
1198 }
1199
1200 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1201 return
1202 }
1203
1204 if self.pending_pings.len() < MAX_NODES_PING {
1205 self.send_ping(node, reason);
1206 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1207 self.queued_pings.push_back((node, reason));
1208 }
1209 }
1210
1211 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1215 let remote_addr = node.udp_addr();
1216 let id = node.id;
1217 let ping = Ping {
1218 from: self.local_node_record.into(),
1219 to: node.into(),
1220 expire: self.ping_expiration(),
1221 enr_sq: self.enr_seq(),
1222 };
1223 trace!(target: "discv4", ?ping, "sending ping");
1224 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1225
1226 self.pending_pings
1227 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1228 echo_hash
1229 }
1230
1231 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1235 if !self.config.enable_eip868 {
1236 return
1237 }
1238 let remote_addr = node.udp_addr();
1239 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1240
1241 trace!(target: "discv4", ?enr_request, "sending enr request");
1242 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1243
1244 self.pending_enr_requests
1245 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1246 }
1247
1248 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1250 if self.is_expired(pong.expire) {
1251 return
1252 }
1253
1254 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1255 Entry::Occupied(entry) => {
1256 {
1257 let request = entry.get();
1258 if request.echo_hash != pong.echo {
1259 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1260 return
1261 }
1262 }
1263 entry.remove()
1264 }
1265 Entry::Vacant(_) => return,
1266 };
1267
1268 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1270
1271 match reason {
1272 PingReason::InitialInsert => {
1273 self.update_on_pong(node, pong.enr_sq);
1274 }
1275 PingReason::EstablishBond => {
1276 self.update_on_pong(node, pong.enr_sq);
1278 }
1279 PingReason::RePing => {
1280 self.update_on_reping(node, pong.enr_sq);
1281 }
1282 PingReason::Lookup(node, ctx) => {
1283 self.update_on_pong(node, pong.enr_sq);
1284 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1289 }
1290 }
1291 }
1292
1293 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1295 if self.is_expired(msg.expire) {
1296 return
1298 }
1299 if node_id == *self.local_peer_id() {
1300 return
1302 }
1303
1304 if self.has_bond(node_id, remote_addr.ip()) {
1305 self.respond_closest(msg.id, remote_addr)
1306 }
1307 }
1308
1309 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1311 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1312 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1313 let enr_id = pk2id(&msg.enr.public_key());
1315 if id != enr_id {
1316 return
1317 }
1318
1319 if resp.echo_hash == msg.request_hash {
1320 let key = kad_key(id);
1321 let fork_id = msg.eth_fork_id();
1322 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1323 kbucket::Entry::Present(mut entry, _) => {
1324 let id = entry.value_mut().update_with_fork_id(fork_id);
1325 (entry.value().record, id)
1326 }
1327 kbucket::Entry::Pending(mut entry, _) => {
1328 let id = entry.value().update_with_fork_id(fork_id);
1329 (entry.value().record, id)
1330 }
1331 _ => return,
1332 };
1333 match (fork_id, old_fork_id) {
1334 (Some(new), Some(old)) => {
1335 if new != old {
1336 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1337 }
1338 }
1339 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1340 _ => {}
1341 }
1342 }
1343 }
1344 }
1345
1346 fn on_enr_request(
1348 &self,
1349 msg: EnrRequest,
1350 remote_addr: SocketAddr,
1351 id: PeerId,
1352 request_hash: B256,
1353 ) {
1354 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1355 return
1356 }
1357
1358 if self.has_bond(id, remote_addr.ip()) {
1359 self.send_packet(
1360 Message::EnrResponse(EnrResponse {
1361 request_hash,
1362 enr: self.local_eip_868_enr.clone(),
1363 }),
1364 remote_addr,
1365 );
1366 }
1367 }
1368
1369 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1372 if self.is_expired(msg.expire) {
1373 return
1375 }
1376 let ctx = match self.pending_find_nodes.entry(node_id) {
1378 Entry::Occupied(mut entry) => {
1379 {
1380 let request = entry.get_mut();
1381 request.answered = true;
1383 let total = request.response_count + msg.nodes.len();
1384
1385 if total <= MAX_NODES_PER_BUCKET {
1387 request.response_count = total;
1388 } else {
1389 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1390 return
1391 }
1392 };
1393
1394 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1395 let ctx = entry.remove().lookup_context;
1397 ctx.mark_responded(node_id);
1398 ctx
1399 } else {
1400 entry.get().lookup_context.clone()
1401 }
1402 }
1403 Entry::Vacant(_) => {
1404 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1406 return
1407 }
1408 };
1409
1410 trace!(target: "discv4",
1412 target=format!("{:#?}", node_id),
1413 peers_count=msg.nodes.len(),
1414 peers=format!("[{:#}]", msg.nodes.iter()
1415 .map(|node_rec| node_rec.id
1416 ).format(", ")),
1417 "Received peers from Neighbours packet"
1418 );
1419
1420 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1423 if self.config.ban_list.is_banned(&node.id, &node.address) {
1425 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1426 continue
1427 }
1428
1429 ctx.add_node(node);
1430 }
1431
1432 let closest =
1434 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1435
1436 for closest in closest {
1437 let key = kad_key(closest.id);
1438 match self.kbuckets.entry(&key) {
1439 BucketEntry::Absent(entry) => {
1440 ctx.mark_queried(closest.id);
1446 let node = NodeEntry::new(closest);
1447 match entry.insert(
1448 node,
1449 NodeStatus {
1450 direction: ConnectionDirection::Outgoing,
1451 state: ConnectionState::Disconnected,
1452 },
1453 ) {
1454 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1455 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1457 }
1458 BucketInsertResult::Full => {
1459 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1461 }
1462 _ => {}
1463 }
1464 }
1465 BucketEntry::SelfEntry => {
1466 }
1468 BucketEntry::Present(entry, _) => {
1469 if entry.value().has_endpoint_proof {
1470 if entry
1471 .value()
1472 .exceeds_find_node_failures(self.config.max_find_node_failures)
1473 {
1474 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1475 } else {
1476 self.find_node(&closest, ctx.clone());
1477 }
1478 }
1479 }
1480 BucketEntry::Pending(mut entry, _) => {
1481 if entry.value().has_endpoint_proof {
1482 if entry
1483 .value()
1484 .exceeds_find_node_failures(self.config.max_find_node_failures)
1485 {
1486 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1487 } else {
1488 self.find_node(&closest, ctx.clone());
1489 }
1490 }
1491 }
1492 }
1493 }
1494 }
1495
1496 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1498 let key = kad_key(target);
1499 let expire = self.send_neighbours_expiration();
1500
1501 let closest_nodes =
1503 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1504
1505 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1506 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1507 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1508 let msg = Message::Neighbours(Neighbours { nodes, expire });
1509 self.send_packet(msg, to);
1510 }
1511 }
1512
1513 fn evict_expired_requests(&mut self, now: Instant) {
1514 self.pending_enr_requests.retain(|_node_id, enr_request| {
1515 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1516 });
1517
1518 let mut failed_pings = Vec::new();
1519 self.pending_pings.retain(|node_id, ping_request| {
1520 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1521 failed_pings.push(*node_id);
1522 return false
1523 }
1524 true
1525 });
1526
1527 if !failed_pings.is_empty() {
1528 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1530 for node_id in failed_pings {
1531 self.remove_node(node_id);
1532 }
1533 }
1534
1535 let mut failed_lookups = Vec::new();
1536 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1537 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1538 failed_lookups.push(*node_id);
1539 return false
1540 }
1541 true
1542 });
1543
1544 if !failed_lookups.is_empty() {
1545 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1547 for node_id in failed_lookups {
1548 self.remove_node(node_id);
1549 }
1550 }
1551
1552 self.evict_failed_find_nodes(now);
1553 }
1554
1555 fn evict_failed_find_nodes(&mut self, now: Instant) {
1557 let mut failed_find_nodes = Vec::new();
1558 self.pending_find_nodes.retain(|node_id, find_node_request| {
1559 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1560 if !find_node_request.answered {
1561 failed_find_nodes.push(*node_id);
1564 }
1565 return false
1566 }
1567 true
1568 });
1569
1570 if failed_find_nodes.is_empty() {
1571 return
1572 }
1573
1574 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1575
1576 for node_id in failed_find_nodes {
1577 let key = kad_key(node_id);
1578 let failures = match self.kbuckets.entry(&key) {
1579 kbucket::Entry::Present(mut entry, _) => {
1580 entry.value_mut().inc_failed_request();
1581 entry.value().find_node_failures
1582 }
1583 kbucket::Entry::Pending(mut entry, _) => {
1584 entry.value().inc_failed_request();
1585 entry.value().find_node_failures
1586 }
1587 _ => continue,
1588 };
1589
1590 if failures > self.config.max_find_node_failures {
1594 self.soft_remove_node(node_id);
1595 }
1596 }
1597 }
1598
1599 fn re_ping_oldest(&mut self) {
1604 let mut nodes = self
1605 .kbuckets
1606 .iter_ref()
1607 .filter(|entry| entry.node.value.is_expired())
1608 .map(|n| n.node.value)
1609 .collect::<Vec<_>>();
1610 nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1611 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1612 for node in to_ping {
1613 self.try_ping(node, PingReason::RePing)
1614 }
1615 }
1616
1617 fn is_expired(&self, expiration: u64) -> bool {
1619 self.ensure_not_expired(expiration).is_err()
1620 }
1621
1622 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1632 let _ = i64::try_from(timestamp).map_err(drop)?;
1634
1635 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1636 if self.config.enforce_expiration_timestamps && timestamp < now {
1637 trace!(target: "discv4", "Expired packet");
1638 return Err(())
1639 }
1640 Ok(())
1641 }
1642
1643 fn ping_buffered(&mut self) {
1645 while self.pending_pings.len() < MAX_NODES_PING {
1646 match self.queued_pings.pop_front() {
1647 Some((next, reason)) => self.try_ping(next, reason),
1648 None => break,
1649 }
1650 }
1651 }
1652
1653 fn ping_expiration(&self) -> u64 {
1654 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1655 .as_secs()
1656 }
1657
1658 fn find_node_expiration(&self) -> u64 {
1659 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1660 .as_secs()
1661 }
1662
1663 fn enr_request_expiration(&self) -> u64 {
1664 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1665 .as_secs()
1666 }
1667
1668 fn send_neighbours_expiration(&self) -> u64 {
1669 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1670 .as_secs()
1671 }
1672
1673 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1679 loop {
1680 if let Some(event) = self.queued_events.pop_front() {
1682 return Poll::Ready(event)
1683 }
1684
1685 if self.config.enable_lookup {
1687 while self.lookup_interval.poll_tick(cx).is_ready() {
1688 let target = self.lookup_rotator.next(&self.local_node_record.id);
1689 self.lookup_with(target, None);
1690 }
1691 }
1692
1693 while self.ping_interval.poll_tick(cx).is_ready() {
1695 self.re_ping_oldest();
1696 }
1697
1698 if let Some(Poll::Ready(Some(ip))) =
1699 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1700 {
1701 self.set_external_ip_addr(ip);
1702 }
1703
1704 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1706 match cmd {
1707 Discv4Command::Add(enr) => {
1708 self.add_node(enr);
1709 }
1710 Discv4Command::Lookup { node_id, tx } => {
1711 let node_id = node_id.unwrap_or(self.local_node_record.id);
1712 self.lookup_with(node_id, tx);
1713 }
1714 Discv4Command::SetLookupInterval(duration) => {
1715 self.set_lookup_interval(duration);
1716 }
1717 Discv4Command::Updates(tx) => {
1718 let rx = self.update_stream();
1719 let _ = tx.send(rx);
1720 }
1721 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1722 Discv4Command::Remove(node_id) => {
1723 self.remove_node(node_id);
1724 }
1725 Discv4Command::Ban(node_id, ip) => {
1726 self.ban_node(node_id);
1727 self.ban_ip(ip);
1728 }
1729 Discv4Command::BanIp(ip) => {
1730 self.ban_ip(ip);
1731 }
1732 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1733 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1734
1735 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1736 }
1737 Discv4Command::SetTcpPort(port) => {
1738 debug!(target: "discv4", %port, "Update tcp port");
1739 self.local_node_record.tcp_port = port;
1740 if self.local_node_record.address.is_ipv4() {
1741 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1742 } else {
1743 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1744 }
1745 }
1746
1747 Discv4Command::Terminated => {
1748 self.queued_events.push_back(Discv4Event::Terminated);
1750 }
1751 }
1752 }
1753
1754 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1756
1757 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1759 match event {
1760 IngressEvent::RecvError(err) => {
1761 debug!(target: "discv4", %err, "failed to read datagram");
1762 }
1763 IngressEvent::BadPacket(from, err, data) => {
1764 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1765 }
1766 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1767 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1768 let event = match msg {
1769 Message::Ping(ping) => {
1770 self.on_ping(ping, remote_addr, node_id, hash);
1771 Discv4Event::Ping
1772 }
1773 Message::Pong(pong) => {
1774 self.on_pong(pong, remote_addr, node_id);
1775 Discv4Event::Pong
1776 }
1777 Message::FindNode(msg) => {
1778 self.on_find_node(msg, remote_addr, node_id);
1779 Discv4Event::FindNode
1780 }
1781 Message::Neighbours(msg) => {
1782 self.on_neighbours(msg, remote_addr, node_id);
1783 Discv4Event::Neighbours
1784 }
1785 Message::EnrRequest(msg) => {
1786 self.on_enr_request(msg, remote_addr, node_id, hash);
1787 Discv4Event::EnrRequest
1788 }
1789 Message::EnrResponse(msg) => {
1790 self.on_enr_response(msg, remote_addr, node_id);
1791 Discv4Event::EnrResponse
1792 }
1793 };
1794
1795 self.queued_events.push_back(event);
1796 }
1797 }
1798
1799 udp_message_budget -= 1;
1800 if udp_message_budget < 0 {
1801 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1802 if self.queued_events.is_empty() {
1803 cx.waker().wake_by_ref();
1806 }
1807 break
1808 }
1809 }
1810
1811 self.ping_buffered();
1813
1814 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1816 self.evict_expired_requests(Instant::now());
1817 }
1818
1819 while self.expire_interval.poll_tick(cx).is_ready() {
1821 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1822 }
1823
1824 if self.queued_events.is_empty() {
1825 return Poll::Pending
1826 }
1827 }
1828 }
1829}
1830
1831impl Stream for Discv4Service {
1833 type Item = Discv4Event;
1834
1835 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1836 match ready!(self.get_mut().poll(cx)) {
1838 Discv4Event::Terminated => Poll::Ready(None),
1840 ev => Poll::Ready(Some(ev)),
1842 }
1843 }
1844}
1845
1846impl fmt::Debug for Discv4Service {
1847 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1848 f.debug_struct("Discv4Service")
1849 .field("local_address", &self.local_address)
1850 .field("local_peer_id", &self.local_peer_id())
1851 .field("local_node_record", &self.local_node_record)
1852 .field("queued_pings", &self.queued_pings)
1853 .field("pending_lookup", &self.pending_lookup)
1854 .field("pending_find_nodes", &self.pending_find_nodes)
1855 .field("lookup_interval", &self.lookup_interval)
1856 .finish_non_exhaustive()
1857 }
1858}
1859
1860#[derive(Debug, Eq, PartialEq)]
1864pub enum Discv4Event {
1865 Ping,
1867 Pong,
1869 FindNode,
1871 Neighbours,
1873 EnrRequest,
1875 EnrResponse,
1877 Terminated,
1879}
1880
1881pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1883 let mut stream = ReceiverStream::new(rx);
1884 while let Some((payload, to)) = stream.next().await {
1885 match udp.send_to(&payload, to).await {
1886 Ok(size) => {
1887 trace!(target: "discv4", ?to, ?size,"sent payload");
1888 }
1889 Err(err) => {
1890 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1891 }
1892 }
1893 }
1894}
1895
1896const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1898
1899pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1904 let send = |event: IngressEvent| async {
1905 let _ = tx.send(event).await.map_err(|err| {
1906 debug!(
1907 target: "discv4",
1908 %err,
1909 "failed send incoming packet",
1910 )
1911 });
1912 };
1913
1914 let mut cache = ReceiveCache::default();
1915
1916 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1918 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1919
1920 let mut buf = [0; MAX_PACKET_SIZE];
1921 loop {
1922 let res = udp.recv_from(&mut buf).await;
1923 match res {
1924 Err(err) => {
1925 debug!(target: "discv4", %err, "Failed to read datagram.");
1926 send(IngressEvent::RecvError(err)).await;
1927 }
1928 Ok((read, remote_addr)) => {
1929 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1931 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1932 continue
1933 }
1934
1935 let packet = &buf[..read];
1936 match Message::decode(packet) {
1937 Ok(packet) => {
1938 if packet.node_id == local_id {
1939 debug!(target: "discv4", ?remote_addr, "Received own packet.");
1941 continue
1942 }
1943
1944 if cache.contains_packet(packet.hash) {
1946 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1947 continue
1948 }
1949
1950 send(IngressEvent::Packet(remote_addr, packet)).await;
1951 }
1952 Err(err) => {
1953 trace!(target: "discv4", %err,"Failed to decode packet");
1954 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1955 }
1956 }
1957 }
1958 }
1959
1960 if poll_fn(|cx| match interval.poll_tick(cx) {
1962 Poll::Ready(_) => Poll::Ready(true),
1963 Poll::Pending => Poll::Ready(false),
1964 })
1965 .await
1966 {
1967 cache.tick_ips(tick);
1968 }
1969 }
1970}
1971
1972struct ReceiveCache {
1976 ip_messages: HashMap<IpAddr, usize>,
1982 unique_packets: schnellru::LruMap<B256, ()>,
1984}
1985
1986impl ReceiveCache {
1987 fn tick_ips(&mut self, tick: usize) {
1991 self.ip_messages.retain(|_, count| {
1992 if let Some(reset) = count.checked_sub(tick) {
1993 *count = reset;
1994 true
1995 } else {
1996 false
1997 }
1998 });
1999 }
2000
2001 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2003 let ctn = self.ip_messages.entry(ip).or_default();
2004 *ctn = ctn.saturating_add(1);
2005 *ctn
2006 }
2007
2008 fn contains_packet(&mut self, hash: B256) -> bool {
2010 !self.unique_packets.insert(hash, ())
2011 }
2012}
2013
2014impl Default for ReceiveCache {
2015 fn default() -> Self {
2016 Self {
2017 ip_messages: Default::default(),
2018 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2019 }
2020 }
2021}
2022
2023enum Discv4Command {
2025 Add(NodeRecord),
2026 SetTcpPort(u16),
2027 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2028 Ban(PeerId, IpAddr),
2029 BanPeer(PeerId),
2030 BanIp(IpAddr),
2031 Remove(PeerId),
2032 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2033 SetLookupInterval(Duration),
2034 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2035 Terminated,
2036}
2037
2038#[derive(Debug)]
2040pub(crate) enum IngressEvent {
2041 RecvError(io::Error),
2043 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2045 Packet(SocketAddr, Packet),
2047}
2048
2049#[derive(Debug)]
2051struct PingRequest {
2052 sent_at: Instant,
2054 node: NodeRecord,
2056 echo_hash: B256,
2058 reason: PingReason,
2060}
2061
2062#[derive(Debug)]
2066struct LookupTargetRotator {
2067 interval: usize,
2068 counter: usize,
2069}
2070
2071impl LookupTargetRotator {
2074 const fn local_only() -> Self {
2076 Self { interval: 1, counter: 0 }
2077 }
2078}
2079
2080impl Default for LookupTargetRotator {
2081 fn default() -> Self {
2082 Self {
2083 interval: 4,
2085 counter: 3,
2086 }
2087 }
2088}
2089
2090impl LookupTargetRotator {
2091 fn next(&mut self, local: &PeerId) -> PeerId {
2093 self.counter += 1;
2094 self.counter %= self.interval;
2095 if self.counter == 0 {
2096 return *local
2097 }
2098 PeerId::random()
2099 }
2100}
2101
2102#[derive(Clone, Debug)]
2107struct LookupContext {
2108 inner: Rc<LookupContextInner>,
2109}
2110
2111impl LookupContext {
2112 fn new(
2114 target: discv5::Key<NodeKey>,
2115 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2116 listener: Option<NodeRecordSender>,
2117 ) -> Self {
2118 let closest_nodes = nearest_nodes
2119 .into_iter()
2120 .map(|(distance, record)| {
2121 (distance, QueryNode { record, queried: false, responded: false })
2122 })
2123 .collect();
2124
2125 let inner = Rc::new(LookupContextInner {
2126 target,
2127 closest_nodes: RefCell::new(closest_nodes),
2128 listener,
2129 });
2130 Self { inner }
2131 }
2132
2133 fn target(&self) -> PeerId {
2135 self.inner.target.preimage().0
2136 }
2137
2138 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2139 self.inner
2140 .closest_nodes
2141 .borrow()
2142 .iter()
2143 .filter(|(_, node)| !node.queried)
2144 .map(|(_, n)| n.record)
2145 .take(num)
2146 .collect()
2147 }
2148
2149 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2151 where
2152 P: FnMut(&NodeRecord) -> bool,
2153 {
2154 self.inner
2155 .closest_nodes
2156 .borrow()
2157 .iter()
2158 .filter(|(_, node)| !node.queried)
2159 .map(|(_, n)| n.record)
2160 .filter(filter)
2161 .take(num)
2162 .collect()
2163 }
2164
2165 fn add_node(&self, record: NodeRecord) {
2167 let distance = self.inner.target.distance(&kad_key(record.id));
2168 let mut closest = self.inner.closest_nodes.borrow_mut();
2169 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2170 entry.insert(QueryNode { record, queried: false, responded: false });
2171 }
2172 }
2173
2174 fn set_queried(&self, id: PeerId, val: bool) {
2175 if let Some((_, node)) =
2176 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2177 {
2178 node.queried = val;
2179 }
2180 }
2181
2182 fn mark_queried(&self, id: PeerId) {
2184 self.set_queried(id, true)
2185 }
2186
2187 fn unmark_queried(&self, id: PeerId) {
2189 self.set_queried(id, false)
2190 }
2191
2192 fn mark_responded(&self, id: PeerId) {
2194 if let Some((_, node)) =
2195 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2196 {
2197 node.responded = true;
2198 }
2199 }
2200}
2201
2202unsafe impl Send for LookupContext {}
2209#[derive(Debug)]
2210struct LookupContextInner {
2211 target: discv5::Key<NodeKey>,
2213 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2215 listener: Option<NodeRecordSender>,
2220}
2221
2222impl Drop for LookupContextInner {
2223 fn drop(&mut self) {
2224 if let Some(tx) = self.listener.take() {
2225 let nodes = self
2228 .closest_nodes
2229 .take()
2230 .into_values()
2231 .filter(|node| node.responded)
2232 .map(|node| node.record)
2233 .collect();
2234 let _ = tx.send(nodes);
2235 }
2236 }
2237}
2238
2239#[derive(Debug, Clone, Copy)]
2241struct QueryNode {
2242 record: NodeRecord,
2243 queried: bool,
2244 responded: bool,
2245}
2246
2247#[derive(Debug)]
2248struct FindNodeRequest {
2249 sent_at: Instant,
2251 response_count: usize,
2253 answered: bool,
2255 lookup_context: LookupContext,
2257}
2258
2259impl FindNodeRequest {
2262 fn new(resp: LookupContext) -> Self {
2263 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2264 }
2265}
2266
2267#[derive(Debug)]
2268struct EnrRequestState {
2269 sent_at: Instant,
2271 echo_hash: B256,
2273}
2274
2275#[derive(Debug, Clone, Eq, PartialEq)]
2277struct NodeEntry {
2278 record: NodeRecord,
2280 last_seen: Instant,
2282 last_enr_seq: Option<u64>,
2284 fork_id: Option<ForkId>,
2286 find_node_failures: u8,
2288 has_endpoint_proof: bool,
2290}
2291
2292impl NodeEntry {
2295 fn new(record: NodeRecord) -> Self {
2297 Self {
2298 record,
2299 last_seen: Instant::now(),
2300 last_enr_seq: None,
2301 fork_id: None,
2302 find_node_failures: 0,
2303 has_endpoint_proof: false,
2304 }
2305 }
2306
2307 #[cfg(test)]
2308 fn new_proven(record: NodeRecord) -> Self {
2309 let mut node = Self::new(record);
2310 node.has_endpoint_proof = true;
2311 node
2312 }
2313
2314 fn establish_proof(&mut self) {
2316 self.has_endpoint_proof = true;
2317 self.find_node_failures = 0;
2318 }
2319
2320 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2322 self.find_node_failures >= max_failures
2323 }
2324
2325 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2327 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2328 }
2329
2330 fn inc_failed_request(&mut self) {
2332 self.find_node_failures += 1;
2333 }
2334
2335 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2337 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2338 }
2339
2340 fn update_now<F, R>(&mut self, f: F) -> R
2342 where
2343 F: FnOnce(&mut Self) -> R,
2344 {
2345 self.last_seen = Instant::now();
2346 f(self)
2347 }
2348}
2349
2350impl NodeEntry {
2353 fn is_expired(&self) -> bool {
2355 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2356 }
2357}
2358
2359#[derive(Debug)]
2361enum PingReason {
2362 InitialInsert,
2364 EstablishBond,
2366 RePing,
2368 Lookup(NodeRecord, LookupContext),
2370}
2371
2372#[derive(Debug, Clone)]
2374pub enum DiscoveryUpdate {
2375 Added(NodeRecord),
2377 DiscoveredAtCapacity(NodeRecord),
2379 EnrForkId(NodeRecord, ForkId),
2381 Removed(PeerId),
2383 Batch(Vec<DiscoveryUpdate>),
2385}
2386
2387#[cfg(test)]
2388mod tests {
2389 use super::*;
2390 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2391 use alloy_primitives::hex;
2392 use alloy_rlp::{Decodable, Encodable};
2393 use rand::{thread_rng, Rng};
2394 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2395 use reth_network_peers::mainnet_nodes;
2396 use std::future::poll_fn;
2397
2398 #[tokio::test]
2399 async fn test_configured_enr_forkid_entry() {
2400 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2401 let mut disc_conf = Discv4Config::default();
2402 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2403 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2404 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2405 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2406
2407 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2408 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2409 let expected = EnrForkIdEntry {
2410 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2411 };
2412 assert_eq!(expected, fork_entry_id);
2413 assert_eq!(expected, decoded);
2414 }
2415
2416 #[test]
2417 fn test_enr_forkid_entry_decode() {
2418 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2419 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2420 let expected = EnrForkIdEntry {
2421 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2422 };
2423 assert_eq!(expected, decoded);
2424 }
2425
2426 #[test]
2427 fn test_enr_forkid_entry_encode() {
2428 let original = EnrForkIdEntry {
2429 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2430 };
2431 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2432 let mut encoded = Vec::with_capacity(expected.len());
2433 original.encode(&mut encoded);
2434 assert_eq!(&expected[..], encoded.as_slice());
2435 }
2436
2437 #[test]
2438 fn test_local_rotator() {
2439 let id = PeerId::random();
2440 let mut rotator = LookupTargetRotator::local_only();
2441 assert_eq!(rotator.next(&id), id);
2442 assert_eq!(rotator.next(&id), id);
2443 }
2444
2445 #[test]
2446 fn test_rotator() {
2447 let id = PeerId::random();
2448 let mut rotator = LookupTargetRotator::default();
2449 assert_eq!(rotator.next(&id), id);
2450 assert_ne!(rotator.next(&id), id);
2451 assert_ne!(rotator.next(&id), id);
2452 assert_ne!(rotator.next(&id), id);
2453 assert_eq!(rotator.next(&id), id);
2454 }
2455
2456 #[tokio::test]
2457 async fn test_pending_ping() {
2458 let (_, mut service) = create_discv4().await;
2459
2460 let local_addr = service.local_addr();
2461
2462 let mut num_inserted = 0;
2463 loop {
2464 let node = NodeRecord::new(local_addr, PeerId::random());
2465 if service.add_node(node) {
2466 num_inserted += 1;
2467 assert!(service.pending_pings.contains_key(&node.id));
2468 assert_eq!(service.pending_pings.len(), num_inserted);
2469 if num_inserted == MAX_NODES_PING {
2470 break
2471 }
2472 }
2473 }
2474
2475 num_inserted = 0;
2477 for _ in 0..MAX_NODES_PING {
2478 let node = NodeRecord::new(local_addr, PeerId::random());
2479 if service.add_node(node) {
2480 num_inserted += 1;
2481 assert!(!service.pending_pings.contains_key(&node.id));
2482 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2483 assert_eq!(service.queued_pings.len(), num_inserted);
2484 }
2485 }
2486 }
2487
2488 #[tokio::test(flavor = "multi_thread")]
2490 #[ignore]
2491 async fn test_mainnet_lookup() {
2492 reth_tracing::init_test_tracing();
2493 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2494
2495 let all_nodes = mainnet_nodes();
2496 let config = Discv4Config::builder()
2497 .add_boot_nodes(all_nodes)
2498 .lookup_interval(Duration::from_secs(1))
2499 .add_eip868_pair("eth", fork_id)
2500 .build();
2501 let (_discv4, mut service) = create_discv4_with_config(config).await;
2502
2503 let mut updates = service.update_stream();
2504
2505 let _handle = service.spawn();
2506
2507 let mut table = HashMap::new();
2508 while let Some(update) = updates.next().await {
2509 match update {
2510 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2511 println!("{record:?}, {fork_id:?}");
2512 }
2513 DiscoveryUpdate::Added(record) => {
2514 table.insert(record.id, record);
2515 }
2516 DiscoveryUpdate::Removed(id) => {
2517 table.remove(&id);
2518 }
2519 _ => {}
2520 }
2521 println!("total peers {}", table.len());
2522 }
2523 }
2524
2525 #[tokio::test]
2526 async fn test_mapped_ipv4() {
2527 reth_tracing::init_test_tracing();
2528 let mut rng = thread_rng();
2529 let config = Discv4Config::builder().build();
2530 let (_discv4, mut service) = create_discv4_with_config(config).await;
2531
2532 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2533 let v6 = v4.to_ipv6_mapped();
2534 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2535
2536 let ping = Ping {
2537 from: rng_endpoint(&mut rng),
2538 to: rng_endpoint(&mut rng),
2539 expire: service.ping_expiration(),
2540 enr_sq: Some(rng.gen()),
2541 };
2542
2543 let id = PeerId::random_with(&mut rng);
2544 service.on_ping(ping, addr, id, rng.gen());
2545
2546 let key = kad_key(id);
2547 match service.kbuckets.entry(&key) {
2548 kbucket::Entry::Present(entry, _) => {
2549 let node_addr = entry.value().record.address;
2550 assert!(node_addr.is_ipv4());
2551 assert_eq!(node_addr, IpAddr::from(v4));
2552 }
2553 _ => unreachable!(),
2554 };
2555 }
2556
2557 #[tokio::test]
2558 async fn test_respect_ping_expiration() {
2559 reth_tracing::init_test_tracing();
2560 let mut rng = thread_rng();
2561 let config = Discv4Config::builder().build();
2562 let (_discv4, mut service) = create_discv4_with_config(config).await;
2563
2564 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2565 let v6 = v4.to_ipv6_mapped();
2566 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2567
2568 let ping = Ping {
2569 from: rng_endpoint(&mut rng),
2570 to: rng_endpoint(&mut rng),
2571 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2572 enr_sq: Some(rng.gen()),
2573 };
2574
2575 let id = PeerId::random_with(&mut rng);
2576 service.on_ping(ping, addr, id, rng.gen());
2577
2578 let key = kad_key(id);
2579 match service.kbuckets.entry(&key) {
2580 kbucket::Entry::Absent(_) => {}
2581 _ => unreachable!(),
2582 };
2583 }
2584
2585 #[tokio::test]
2586 async fn test_single_lookups() {
2587 reth_tracing::init_test_tracing();
2588
2589 let config = Discv4Config::builder().build();
2590 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2591
2592 let id = PeerId::random();
2593 let key = kad_key(id);
2594 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2595
2596 let _ = service.kbuckets.insert_or_update(
2597 &key,
2598 NodeEntry::new_proven(record),
2599 NodeStatus {
2600 direction: ConnectionDirection::Incoming,
2601 state: ConnectionState::Connected,
2602 },
2603 );
2604
2605 service.lookup_self();
2606 assert_eq!(service.pending_find_nodes.len(), 1);
2607
2608 poll_fn(|cx| {
2609 let _ = service.poll(cx);
2610 assert_eq!(service.pending_find_nodes.len(), 1);
2611
2612 Poll::Ready(())
2613 })
2614 .await;
2615 }
2616
2617 #[tokio::test]
2618 async fn test_on_neighbours_recursive_lookup() {
2619 reth_tracing::init_test_tracing();
2620
2621 let config = Discv4Config::builder().build();
2622 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2623 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2624
2625 let id = PeerId::random();
2626 let key = kad_key(id);
2627 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2628
2629 let _ = service.kbuckets.insert_or_update(
2630 &key,
2631 NodeEntry::new_proven(record),
2632 NodeStatus {
2633 direction: ConnectionDirection::Incoming,
2634 state: ConnectionState::Connected,
2635 },
2636 );
2637 service.lookup_self();
2640 assert_eq!(service.pending_find_nodes.len(), 1);
2641
2642 poll_fn(|cx| {
2643 let _ = service.poll(cx);
2644 assert_eq!(service.pending_find_nodes.len(), 1);
2645
2646 Poll::Ready(())
2647 })
2648 .await;
2649
2650 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2651 10000000000000;
2652 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2653 service.on_neighbours(msg, record.tcp_addr(), id);
2654 let event = poll_fn(|cx| service2.poll(cx)).await;
2656 assert_eq!(event, Discv4Event::Ping);
2657 assert_eq!(service.pending_find_nodes.len(), 1);
2660 let event = poll_fn(|cx| service.poll(cx)).await;
2662 assert_eq!(event, Discv4Event::Pong);
2663 let event = poll_fn(|cx| service.poll(cx)).await;
2668 assert_eq!(event, Discv4Event::Ping);
2669 assert_eq!(service.pending_find_nodes.len(), 2);
2672 }
2673
2674 #[tokio::test]
2675 async fn test_no_local_in_closest() {
2676 reth_tracing::init_test_tracing();
2677
2678 let config = Discv4Config::builder().build();
2679 let (_discv4, mut service) = create_discv4_with_config(config).await;
2680
2681 let target_key = kad_key(PeerId::random());
2682
2683 let id = PeerId::random();
2684 let key = kad_key(id);
2685 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2686
2687 let _ = service.kbuckets.insert_or_update(
2688 &key,
2689 NodeEntry::new(record),
2690 NodeStatus {
2691 direction: ConnectionDirection::Incoming,
2692 state: ConnectionState::Connected,
2693 },
2694 );
2695
2696 let closest = service
2697 .kbuckets
2698 .closest_values(&target_key)
2699 .map(|n| n.value.record)
2700 .take(MAX_NODES_PER_BUCKET)
2701 .collect::<Vec<_>>();
2702
2703 assert_eq!(closest.len(), 1);
2704 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2705 }
2706
2707 #[tokio::test]
2708 async fn test_random_lookup() {
2709 reth_tracing::init_test_tracing();
2710
2711 let config = Discv4Config::builder().build();
2712 let (_discv4, mut service) = create_discv4_with_config(config).await;
2713
2714 let target = PeerId::random();
2715
2716 let id = PeerId::random();
2717 let key = kad_key(id);
2718 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2719
2720 let _ = service.kbuckets.insert_or_update(
2721 &key,
2722 NodeEntry::new_proven(record),
2723 NodeStatus {
2724 direction: ConnectionDirection::Incoming,
2725 state: ConnectionState::Connected,
2726 },
2727 );
2728
2729 service.lookup(target);
2730 assert_eq!(service.pending_find_nodes.len(), 1);
2731
2732 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2733
2734 assert_eq!(ctx.target(), target);
2735 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2736
2737 ctx.add_node(record);
2738 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2739 }
2740
2741 #[tokio::test]
2742 async fn test_reping_on_find_node_failures() {
2743 reth_tracing::init_test_tracing();
2744
2745 let config = Discv4Config::builder().build();
2746 let (_discv4, mut service) = create_discv4_with_config(config).await;
2747
2748 let target = PeerId::random();
2749
2750 let id = PeerId::random();
2751 let key = kad_key(id);
2752 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2753
2754 let mut entry = NodeEntry::new_proven(record);
2755 entry.find_node_failures = u8::MAX;
2756 let _ = service.kbuckets.insert_or_update(
2757 &key,
2758 entry,
2759 NodeStatus {
2760 direction: ConnectionDirection::Incoming,
2761 state: ConnectionState::Connected,
2762 },
2763 );
2764
2765 service.lookup(target);
2766 assert_eq!(service.pending_find_nodes.len(), 0);
2767 assert_eq!(service.pending_pings.len(), 1);
2768
2769 service.update_on_pong(record, None);
2770
2771 service
2772 .on_entry(record.id, |entry| {
2773 assert_eq!(entry.find_node_failures, 0);
2775 assert!(entry.has_endpoint_proof);
2776 })
2777 .unwrap();
2778 }
2779
2780 #[tokio::test]
2781 async fn test_service_commands() {
2782 reth_tracing::init_test_tracing();
2783
2784 let config = Discv4Config::builder().build();
2785 let (discv4, mut service) = create_discv4_with_config(config).await;
2786
2787 service.lookup_self();
2788
2789 let _handle = service.spawn();
2790 discv4.send_lookup_self();
2791 let _ = discv4.lookup_self().await;
2792 }
2793
2794 #[tokio::test]
2795 async fn test_requests_timeout() {
2796 reth_tracing::init_test_tracing();
2797 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2798
2799 let config = Discv4Config::builder()
2800 .request_timeout(Duration::from_millis(200))
2801 .ping_expiration(Duration::from_millis(200))
2802 .lookup_neighbours_expiration(Duration::from_millis(200))
2803 .add_eip868_pair("eth", fork_id)
2804 .build();
2805 let (_disv4, mut service) = create_discv4_with_config(config).await;
2806
2807 let id = PeerId::random();
2808 let key = kad_key(id);
2809 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2810
2811 let _ = service.kbuckets.insert_or_update(
2812 &key,
2813 NodeEntry::new_proven(record),
2814 NodeStatus {
2815 direction: ConnectionDirection::Incoming,
2816 state: ConnectionState::Connected,
2817 },
2818 );
2819
2820 service.lookup_self();
2821 assert_eq!(service.pending_find_nodes.len(), 1);
2822
2823 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2824
2825 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2826
2827 assert_eq!(service.pending_lookup.len(), 1);
2828
2829 let ping = Ping {
2830 from: service.local_node_record.into(),
2831 to: record.into(),
2832 expire: service.ping_expiration(),
2833 enr_sq: service.enr_seq(),
2834 };
2835 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2836 let ping_request = PingRequest {
2837 sent_at: Instant::now(),
2838 node: record,
2839 echo_hash,
2840 reason: PingReason::InitialInsert,
2841 };
2842 service.pending_pings.insert(record.id, ping_request);
2843
2844 assert_eq!(service.pending_pings.len(), 1);
2845
2846 tokio::time::sleep(Duration::from_secs(1)).await;
2847
2848 poll_fn(|cx| {
2849 let _ = service.poll(cx);
2850
2851 assert_eq!(service.pending_find_nodes.len(), 0);
2852 assert_eq!(service.pending_lookup.len(), 0);
2853 assert_eq!(service.pending_pings.len(), 0);
2854
2855 Poll::Ready(())
2856 })
2857 .await;
2858 }
2859
2860 #[tokio::test(flavor = "multi_thread")]
2862 async fn test_check_wrong_to() {
2863 reth_tracing::init_test_tracing();
2864
2865 let config = Discv4Config::builder().external_ip_resolver(None).build();
2866 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2867 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2868
2869 let mut ping = Ping {
2871 from: service_1.local_node_record.into(),
2872 to: service_2.local_node_record.into(),
2873 expire: service_1.ping_expiration(),
2874 enr_sq: service_1.enr_seq(),
2875 };
2876 ping.to.address = "192.0.2.0".parse().unwrap();
2877
2878 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2879 let ping_request = PingRequest {
2880 sent_at: Instant::now(),
2881 node: service_2.local_node_record,
2882 echo_hash,
2883 reason: PingReason::InitialInsert,
2884 };
2885 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2886
2887 let event = poll_fn(|cx| service_2.poll(cx)).await;
2889 assert_eq!(event, Discv4Event::Ping);
2890
2891 let event = poll_fn(|cx| service_1.poll(cx)).await;
2893 assert_eq!(event, Discv4Event::Pong);
2894 let event = poll_fn(|cx| service_1.poll(cx)).await;
2896 assert_eq!(event, Discv4Event::Ping);
2897 }
2898
2899 #[tokio::test(flavor = "multi_thread")]
2900 async fn test_check_ping_pong() {
2901 reth_tracing::init_test_tracing();
2902
2903 let config = Discv4Config::builder().external_ip_resolver(None).build();
2904 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2905 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2906
2907 service_1.add_node(service_2.local_node_record);
2909
2910 let event = poll_fn(|cx| service_2.poll(cx)).await;
2912 assert_eq!(event, Discv4Event::Ping);
2913
2914 let key1 = kad_key(*service_1.local_peer_id());
2916 match service_2.kbuckets.entry(&key1) {
2917 kbucket::Entry::Present(_entry, status) => {
2918 assert!(!status.is_connected());
2919 }
2920 _ => unreachable!(),
2921 }
2922
2923 let event = poll_fn(|cx| service_1.poll(cx)).await;
2925 assert_eq!(event, Discv4Event::Pong);
2926
2927 let key2 = kad_key(*service_2.local_peer_id());
2929 match service_1.kbuckets.entry(&key2) {
2930 kbucket::Entry::Present(_entry, status) => {
2931 assert!(status.is_connected());
2932 }
2933 _ => unreachable!(),
2934 }
2935
2936 let event = poll_fn(|cx| service_1.poll(cx)).await;
2938 assert_eq!(event, Discv4Event::Ping);
2939
2940 let event = poll_fn(|cx| service_2.poll(cx)).await;
2942
2943 match event {
2944 Discv4Event::EnrRequest => {
2945 let event = poll_fn(|cx| service_2.poll(cx)).await;
2947 match event {
2948 Discv4Event::EnrRequest => {
2949 let event = poll_fn(|cx| service_2.poll(cx)).await;
2950 assert_eq!(event, Discv4Event::Pong);
2951 }
2952 Discv4Event::Pong => {}
2953 _ => {
2954 unreachable!()
2955 }
2956 }
2957 }
2958 Discv4Event::Pong => {}
2959 ev => unreachable!("{ev:?}"),
2960 }
2961
2962 match service_2.kbuckets.entry(&key1) {
2964 kbucket::Entry::Present(_entry, status) => {
2965 assert!(status.is_connected());
2966 }
2967 ev => unreachable!("{ev:?}"),
2968 }
2969 }
2970
2971 #[test]
2972 fn test_insert() {
2973 let local_node_record = rng_record(&mut rand::thread_rng());
2974 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2975 NodeKey::from(&local_node_record).into(),
2976 Duration::from_secs(60),
2977 MAX_NODES_PER_BUCKET,
2978 None,
2979 None,
2980 );
2981
2982 let new_record = rng_record(&mut rand::thread_rng());
2983 let key = kad_key(new_record.id);
2984 match kbuckets.entry(&key) {
2985 kbucket::Entry::Absent(entry) => {
2986 let node = NodeEntry::new(new_record);
2987 let _ = entry.insert(
2988 node,
2989 NodeStatus {
2990 direction: ConnectionDirection::Outgoing,
2991 state: ConnectionState::Disconnected,
2992 },
2993 );
2994 }
2995 _ => {
2996 unreachable!()
2997 }
2998 };
2999 match kbuckets.entry(&key) {
3000 kbucket::Entry::Present(_, _) => {}
3001 _ => {
3002 unreachable!()
3003 }
3004 }
3005 }
3006}