1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7};
8use futures::StreamExt;
9use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
10use reth_ethereum_forks::ForkId;
11use reth_net_banlist::BanList;
12use reth_network_api::test_utils::{PeerCommand, PeersHandle};
13use reth_network_peers::{NodeRecord, PeerId};
14use reth_network_types::{
15 peers::{
16 config::PeerBackoffDurations,
17 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
18 },
19 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
20 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
21};
22use std::{
23 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
24 fmt::Display,
25 io::{self},
26 net::{IpAddr, SocketAddr},
27 task::{Context, Poll},
28 time::Duration,
29};
30use thiserror::Error;
31use tokio::{
32 sync::mpsc,
33 time::{Instant, Interval},
34};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tracing::{trace, warn};
37
38#[derive(Debug)]
45pub struct PeersManager {
46 peers: HashMap<PeerId, Peer>,
48 trusted_peer_ids: HashSet<PeerId>,
53 manager_tx: mpsc::UnboundedSender<PeerCommand>,
55 handle_rx: UnboundedReceiverStream<PeerCommand>,
57 queued_actions: VecDeque<PeerAction>,
59 refill_slots_interval: Interval,
61 reputation_weights: ReputationChangeWeights,
63 connection_info: ConnectionInfo,
65 ban_list: BanList,
67 backed_off_peers: HashMap<PeerId, std::time::Instant>,
69 release_interval: Interval,
71 ban_duration: Duration,
73 backoff_durations: PeerBackoffDurations,
76 trusted_nodes_only: bool,
79 last_tick: Instant,
81 max_backoff_count: u8,
83 net_connection_state: NetworkConnectionState,
85 incoming_ip_throttle_duration: Duration,
87}
88
89impl PeersManager {
90 pub fn new(config: PeersConfig) -> Self {
92 let PeersConfig {
93 refill_slots_interval,
94 connection_info,
95 reputation_weights,
96 ban_list,
97 ban_duration,
98 backoff_durations,
99 trusted_nodes,
100 trusted_nodes_only,
101 basic_nodes,
102 max_backoff_count,
103 incoming_ip_throttle_duration,
104 } = config;
105 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
106 let now = Instant::now();
107
108 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
110
111 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
112 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
113
114 for trusted_peer in trusted_nodes {
115 match trusted_peer.resolve_blocking() {
116 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
117 trusted_peer_ids.insert(id);
118 peers.entry(id).or_insert_with(|| {
119 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
120 });
121 }
122 Err(err) => {
123 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
124 }
125 }
126 }
127
128 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
129 peers.entry(id).or_insert_with(|| {
130 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
131 });
132 }
133
134 Self {
135 peers,
136 trusted_peer_ids,
137 manager_tx,
138 handle_rx: UnboundedReceiverStream::new(handle_rx),
139 queued_actions: Default::default(),
140 reputation_weights,
141 refill_slots_interval: tokio::time::interval(refill_slots_interval),
142 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
143 connection_info: ConnectionInfo::new(connection_info),
144 ban_list,
145 backed_off_peers: Default::default(),
146 ban_duration,
147 backoff_durations,
148 trusted_nodes_only,
149 last_tick: Instant::now(),
150 max_backoff_count,
151 net_connection_state: NetworkConnectionState::default(),
152 incoming_ip_throttle_duration,
153 }
154 }
155
156 pub(crate) fn handle(&self) -> PeersHandle {
158 PeersHandle::new(self.manager_tx.clone())
159 }
160
161 #[inline]
163 pub(crate) fn num_known_peers(&self) -> usize {
164 self.peers.len()
165 }
166
167 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
169 self.peers.iter().map(|(peer_id, v)| {
170 NodeRecord::new_with_ports(
171 v.addr.tcp().ip(),
172 v.addr.tcp().port(),
173 v.addr.udp().map(|addr| addr.port()),
174 *peer_id,
175 )
176 })
177 }
178
179 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
181 self.peers.get(&peer_id).map(|v| {
182 (
183 NodeRecord::new_with_ports(
184 v.addr.tcp().ip(),
185 v.addr.tcp().port(),
186 v.addr.udp().map(|addr| addr.port()),
187 peer_id,
188 ),
189 v.kind,
190 )
191 })
192 }
193
194 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
196 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
197 }
198
199 #[inline]
201 pub(crate) const fn num_inbound_connections(&self) -> usize {
202 self.connection_info.num_inbound
203 }
204
205 #[inline]
207 pub(crate) const fn num_outbound_connections(&self) -> usize {
208 self.connection_info.num_outbound
209 }
210
211 #[inline]
213 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
214 self.connection_info.num_pending_out
215 }
216
217 #[inline]
219 pub(crate) fn num_backed_off_peers(&self) -> usize {
220 self.backed_off_peers.len()
221 }
222
223 fn num_idle_trusted_peers(&self) -> usize {
225 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
226 }
227
228 pub(crate) fn on_incoming_pending_session(
232 &mut self,
233 addr: IpAddr,
234 ) -> Result<(), InboundConnectionError> {
235 if self.ban_list.is_banned_ip(&addr) {
236 return Err(InboundConnectionError::IpBanned)
237 }
238
239 if !self.connection_info.has_in_capacity() {
241 if self.trusted_peer_ids.is_empty() {
242 return Err(InboundConnectionError::ExceedsCapacity)
245 }
246
247 let num_idle_trusted_peers = self.num_idle_trusted_peers();
251 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
252 let max_inbound =
254 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
255 if self.connection_info.num_pending_in <= max_inbound {
256 self.connection_info.inc_pending_in();
257 }
258 return Ok(())
259 }
260
261 return Err(InboundConnectionError::ExceedsCapacity)
263 }
264
265 if !self.connection_info.has_in_pending_capacity() {
267 return Err(InboundConnectionError::ExceedsCapacity)
268 }
269
270 self.throttle_incoming_ip(addr);
272
273 self.connection_info.inc_pending_in();
274 Ok(())
275 }
276
277 pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) {
280 self.connection_info.decr_pending_in();
281 }
282
283 pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) {
285 self.connection_info.decr_pending_in()
286 }
287
288 pub(crate) fn on_incoming_pending_session_dropped(
290 &mut self,
291 remote_addr: SocketAddr,
292 err: &PendingSessionHandshakeError,
293 ) {
294 if err.is_fatal_protocol_error() {
295 self.ban_ip(remote_addr.ip());
296
297 if err.merits_discovery_ban() {
298 self.queued_actions
299 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
300 }
301 }
302
303 self.connection_info.decr_pending_in();
304 }
305
306 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
313 self.connection_info.decr_pending_in();
314
315 if self.ban_list.is_banned_peer(&peer_id) {
318 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
319 return
320 }
321
322 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
324 if self.trusted_nodes_only && !is_trusted {
325 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
326 return
327 }
328
329 self.tick();
331
332 let has_in_capacity = self.connection_info.has_in_capacity();
333 self.connection_info.inc_in();
334
335 match self.peers.entry(peer_id) {
336 Entry::Occupied(mut entry) => {
337 let peer = entry.get_mut();
338 if peer.is_banned() {
339 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
340 return
341 }
342 if peer.state.is_pending_out() {
345 self.connection_info.decr_state(peer.state);
346 }
347
348 peer.state = PeerConnectionState::In;
349
350 is_trusted = is_trusted || peer.is_trusted();
351 }
352 Entry::Vacant(entry) => {
353 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
356 peer.remove_after_disconnect = true;
357 entry.insert(peer);
358 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
359 }
360 }
361
362 if !is_trusted && !has_in_capacity {
364 self.queued_actions.push_back(PeerAction::Disconnect {
365 peer_id,
366 reason: Some(DisconnectReason::TooManyPeers),
367 });
368 }
369 }
370
371 fn ban_peer(&mut self, peer_id: PeerId) {
373 let mut ban_duration = self.ban_duration;
374 if let Some(peer) = self.peers.get(&peer_id) {
375 if peer.is_trusted() || peer.is_static() {
376 ban_duration = self.backoff_durations.low / 2;
379 }
380 }
381
382 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
383 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
384 }
385
386 fn ban_ip(&mut self, ip: IpAddr) {
388 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
389 }
390
391 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
393 self.ban_list
394 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
395 }
396
397 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
399 trace!(target: "net::peers", ?peer_id, "backing off");
400
401 if let Some(peer) = self.peers.get_mut(&peer_id) {
402 peer.backed_off = true;
403 self.backed_off_peers.insert(peer_id, until);
404 }
405 }
406
407 fn unban_peer(&mut self, peer_id: PeerId) {
409 self.ban_list.unban_peer(&peer_id);
410 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
411 }
412
413 fn tick(&mut self) {
418 let now = Instant::now();
419 let secs_since_last_tick =
423 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
424 self.last_tick = now;
425
426 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
428 if peer.1.reputation < DEFAULT_REPUTATION {
431 peer.1.reputation += secs_since_last_tick;
432 }
433 }
434 }
435
436 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
438 self.peers.get(peer_id).map(|peer| peer.reputation)
439 }
440
441 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
447 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
448 if rep.is_reset() {
450 peer.reset_reputation()
451 } else {
452 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
453 if peer.is_trusted() || peer.is_static() {
454 if matches!(
456 rep,
457 ReputationChangeKind::Dropped |
458 ReputationChangeKind::BadAnnouncement |
459 ReputationChangeKind::Timeout |
460 ReputationChangeKind::AlreadySeenTransaction
461 ) {
462 return
463 }
464
465 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
467 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
469 }
470 }
471 peer.apply_reputation(reputation_change)
472 }
473 } else {
474 return
475 };
476
477 match outcome {
478 ReputationChangeOutcome::None => {}
479 ReputationChangeOutcome::Ban => {
480 self.ban_peer(*peer_id);
481 }
482 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
483 ReputationChangeOutcome::DisconnectAndBan => {
484 self.queued_actions.push_back(PeerAction::Disconnect {
485 peer_id: *peer_id,
486 reason: Some(DisconnectReason::DisconnectRequested),
487 });
488 self.ban_peer(*peer_id);
489 }
490 }
491 }
492
493 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
495 if let Some(peer) = self.peers.get_mut(peer_id) {
496 self.connection_info.decr_state(peer.state);
497 peer.state = PeerConnectionState::Idle;
498 }
499 }
500
501 pub(crate) fn on_outgoing_pending_session_dropped(
504 &mut self,
505 remote_addr: &SocketAddr,
506 peer_id: &PeerId,
507 err: &PendingSessionHandshakeError,
508 ) {
509 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
510 }
511
512 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
514 match self.peers.entry(peer_id) {
515 Entry::Occupied(mut entry) => {
516 self.connection_info.decr_state(entry.get().state);
517
518 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
519 entry.remove();
521 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
522 } else {
523 entry.get_mut().severe_backoff_counter = 0;
527 entry.get_mut().state = PeerConnectionState::Idle;
528 return
529 }
530 }
531 Entry::Vacant(_) => return,
532 }
533
534 self.fill_outbound_slots();
535 }
536
537 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
539 if let Some(peer) = self.peers.get_mut(&peer_id) {
540 self.connection_info.decr_state(peer.state);
541 self.connection_info.inc_out();
542 peer.state = PeerConnectionState::Out;
543 }
544 }
545
546 pub(crate) fn on_active_session_dropped(
551 &mut self,
552 remote_addr: &SocketAddr,
553 peer_id: &PeerId,
554 err: &EthStreamError,
555 ) {
556 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
557 }
558
559 pub(crate) fn on_outgoing_connection_failure(
562 &mut self,
563 remote_addr: &SocketAddr,
564 peer_id: &PeerId,
565 err: &io::Error,
566 ) {
567 if let Some(peer) = self.peers.get(peer_id) {
571 if peer.state.is_incoming() {
572 return
574 }
575 }
576
577 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
578 }
579
580 fn on_connection_failure(
581 &mut self,
582 remote_addr: &SocketAddr,
583 peer_id: &PeerId,
584 err: impl SessionError,
585 reputation_change: ReputationChangeKind,
586 ) {
587 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
588
589 if err.is_fatal_protocol_error() {
590 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
591 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
594 self.connection_info.decr_state(entry.get().state);
595 if entry.get().is_trusted() {
597 entry.get_mut().state = PeerConnectionState::Idle;
598 } else {
599 entry.remove();
600 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
601 if err.merits_discovery_ban() {
603 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
604 peer_id: *peer_id,
605 ip_addr: remote_addr.ip(),
606 })
607 }
608 }
609 }
610
611 self.ban_peer(*peer_id);
613 } else {
614 let mut backoff_until = None;
615 let mut remove_peer = false;
616
617 if let Some(peer) = self.peers.get_mut(peer_id) {
618 if let Some(kind) = err.should_backoff() {
619 if kind.is_severe() {
621 peer.severe_backoff_counter = peer.severe_backoff_counter.saturating_add(1);
622 }
623
624 let backoff_time =
625 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
626
627 backoff_until = Some(backoff_time);
631 } else {
632 let reputation_change = self.reputation_weights.change(reputation_change);
634 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
635 };
636
637 self.connection_info.decr_state(peer.state);
638 peer.state = PeerConnectionState::Idle;
639
640 if peer.severe_backoff_counter > self.max_backoff_count && !peer.is_trusted() {
641 remove_peer = true;
644 }
645 }
646
647 if remove_peer {
649 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
650 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
651 } else if let Some(backoff_until) = backoff_until {
652 self.backoff_peer_until(*peer_id, backoff_until);
654 }
655 }
656
657 self.fill_outbound_slots();
658 }
659
660 pub(crate) fn on_already_connected(&mut self, direction: Direction) {
666 match direction {
667 Direction::Incoming => {
668 self.connection_info.decr_pending_in();
670 }
671 Direction::Outgoing(_) => {
672 }
675 }
676 }
677
678 pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
683 if let Some(peer) = self.peers.get_mut(&peer_id) {
684 trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
685 peer.fork_id = Some(fork_id);
686 }
687 }
688
689 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
693 self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
694 }
695
696 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
698 self.trusted_peer_ids.insert(peer_id);
699 }
700
701 #[allow(dead_code)]
705 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
706 self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
707 }
708
709 pub(crate) fn add_peer_kind(
713 &mut self,
714 peer_id: PeerId,
715 kind: PeerKind,
716 addr: PeerAddr,
717 fork_id: Option<ForkId>,
718 ) {
719 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
720 return
721 }
722
723 match self.peers.entry(peer_id) {
724 Entry::Occupied(mut entry) => {
725 let peer = entry.get_mut();
726 peer.kind = kind;
727 peer.fork_id = fork_id;
728 peer.addr = addr;
729
730 if peer.state.is_incoming() {
731 peer.remove_after_disconnect = false;
735 }
736 }
737 Entry::Vacant(entry) => {
738 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
739 let mut peer = Peer::with_kind(addr, kind);
740 peer.fork_id = fork_id;
741 entry.insert(peer);
742 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
743 }
744 }
745
746 if kind.is_trusted() {
747 self.trusted_peer_ids.insert(peer_id);
748 }
749 }
750
751 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
753 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
754 if entry.get().is_trusted() {
755 return
756 }
757 let mut peer = entry.remove();
758
759 trace!(target: "net::peers", ?peer_id, "remove discovered node");
760 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
761
762 if peer.state.is_connected() {
763 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
764 peer.remove_after_disconnect = true;
769 peer.state.disconnect();
770 self.peers.insert(peer_id, peer);
771 self.queued_actions.push_back(PeerAction::Disconnect {
772 peer_id,
773 reason: Some(DisconnectReason::DisconnectRequested),
774 })
775 }
776 }
777
778 #[allow(dead_code)]
781 pub(crate) fn add_and_connect(
782 &mut self,
783 peer_id: PeerId,
784 addr: PeerAddr,
785 fork_id: Option<ForkId>,
786 ) {
787 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
788 }
789
790 pub(crate) fn add_and_connect_kind(
792 &mut self,
793 peer_id: PeerId,
794 kind: PeerKind,
795 addr: PeerAddr,
796 fork_id: Option<ForkId>,
797 ) {
798 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
799 return
800 }
801
802 match self.peers.entry(peer_id) {
803 Entry::Vacant(entry) => {
804 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
805 let mut peer = Peer::with_kind(addr, kind);
806 peer.state = PeerConnectionState::PendingOut;
807 peer.fork_id = fork_id;
808 entry.insert(peer);
809 self.queued_actions
810 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
811 }
812 _ => return,
813 }
814
815 if kind.is_trusted() {
816 self.trusted_peer_ids.insert(peer_id);
817 }
818 }
819
820 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
822 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
823 if !entry.get().is_trusted() {
824 return
825 }
826
827 let peer = entry.get_mut();
828 peer.kind = PeerKind::Basic;
829
830 self.trusted_peer_ids.remove(&peer_id);
831 }
832
833 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
843 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
844 !peer.is_backed_off() &&
845 !peer.is_banned() &&
846 peer.state.is_unconnected() &&
847 (!self.trusted_nodes_only || peer.is_trusted())
848 });
849
850 let mut best_peer = unconnected.next()?;
852
853 if best_peer.1.is_trusted() || best_peer.1.is_static() {
854 return Some((*best_peer.0, best_peer.1))
855 }
856
857 for maybe_better in unconnected {
858 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
860 return Some((*maybe_better.0, maybe_better.1))
861 }
862
863 if maybe_better.1.reputation > best_peer.1.reputation {
865 best_peer = maybe_better;
866 }
867 }
868 Some((*best_peer.0, best_peer.1))
869 }
870
871 fn fill_outbound_slots(&mut self) {
877 self.tick();
878
879 if !self.net_connection_state.is_active() {
880 return
882 }
883
884 while self.connection_info.has_out_capacity() {
886 let action = {
887 let (peer_id, peer) = match self.best_unconnected() {
888 Some(peer) => peer,
889 _ => break,
890 };
891
892 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
893
894 peer.state = PeerConnectionState::PendingOut;
895 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
896 };
897
898 self.connection_info.inc_pending_out();
899
900 self.queued_actions.push_back(action);
901 }
902 }
903
904 pub fn on_network_state_change(&mut self, state: NetworkConnectionState) {
906 self.net_connection_state = state;
907 }
908
909 pub const fn connection_state(&self) -> &NetworkConnectionState {
911 &self.net_connection_state
912 }
913
914 pub fn on_shutdown(&mut self) {
916 self.net_connection_state = NetworkConnectionState::ShuttingDown;
917 }
918
919 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
924 loop {
925 if let Some(action) = self.queued_actions.pop_front() {
927 return Poll::Ready(action)
928 }
929
930 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
931 match cmd {
932 PeerCommand::Add(peer_id, addr) => {
933 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
934 }
935 PeerCommand::Remove(peer) => self.remove_peer(peer),
936 PeerCommand::ReputationChange(peer_id, rep) => {
937 self.apply_reputation_change(&peer_id, rep)
938 }
939 PeerCommand::GetPeer(peer, tx) => {
940 let _ = tx.send(self.peers.get(&peer).cloned());
941 }
942 PeerCommand::GetPeers(tx) => {
943 let _ = tx.send(self.iter_peers().collect());
944 }
945 }
946 }
947
948 if self.release_interval.poll_tick(cx).is_ready() {
949 let now = std::time::Instant::now();
950 let (_, unbanned_peers) = self.ban_list.evict(now);
951
952 for peer_id in unbanned_peers {
953 if let Some(peer) = self.peers.get_mut(&peer_id) {
954 peer.unban();
955 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
956 }
957 }
958
959 self.backed_off_peers.retain(|peer_id, until| {
962 if now > *until {
963 if let Some(peer) = self.peers.get_mut(peer_id) {
964 peer.backed_off = false;
965 }
966 return false
967 }
968 true
969 })
970 }
971
972 while self.refill_slots_interval.poll_tick(cx).is_ready() {
973 self.fill_outbound_slots();
974 }
975
976 if self.queued_actions.is_empty() {
977 return Poll::Pending
978 }
979 }
980 }
981}
982
983impl Default for PeersManager {
984 fn default() -> Self {
985 Self::new(Default::default())
986 }
987}
988
989#[derive(Debug, Clone, PartialEq, Eq, Default)]
991pub struct ConnectionInfo {
992 num_outbound: usize,
994 num_pending_out: usize,
996 num_inbound: usize,
998 num_pending_in: usize,
1000 config: ConnectionsConfig,
1002}
1003
1004impl ConnectionInfo {
1007 const fn new(config: ConnectionsConfig) -> Self {
1009 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1010 }
1011
1012 const fn has_out_capacity(&self) -> bool {
1014 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1015 self.num_outbound < self.config.max_outbound
1016 }
1017
1018 const fn has_in_capacity(&self) -> bool {
1020 self.num_inbound < self.config.max_inbound
1021 }
1022
1023 const fn has_in_pending_capacity(&self) -> bool {
1025 self.num_pending_in < self.config.max_inbound
1026 }
1027
1028 fn decr_state(&mut self, state: PeerConnectionState) {
1029 match state {
1030 PeerConnectionState::Idle => {}
1031 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1032 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1033 PeerConnectionState::PendingOut => self.decr_pending_out(),
1034 }
1035 }
1036
1037 fn decr_out(&mut self) {
1038 self.num_outbound -= 1;
1039 }
1040
1041 fn inc_out(&mut self) {
1042 self.num_outbound += 1;
1043 }
1044
1045 fn inc_pending_out(&mut self) {
1046 self.num_pending_out += 1;
1047 }
1048
1049 fn inc_in(&mut self) {
1050 self.num_inbound += 1;
1051 }
1052
1053 fn inc_pending_in(&mut self) {
1054 self.num_pending_in += 1;
1055 }
1056
1057 fn decr_in(&mut self) {
1058 self.num_inbound -= 1;
1059 }
1060
1061 fn decr_pending_out(&mut self) {
1062 self.num_pending_out -= 1;
1063 }
1064
1065 fn decr_pending_in(&mut self) {
1066 self.num_pending_in -= 1;
1067 }
1068}
1069
1070#[derive(Debug)]
1072pub enum PeerAction {
1073 Connect {
1075 peer_id: PeerId,
1077 remote_addr: SocketAddr,
1079 },
1080 Disconnect {
1082 peer_id: PeerId,
1084 reason: Option<DisconnectReason>,
1086 },
1087 DisconnectBannedIncoming {
1090 peer_id: PeerId,
1092 },
1093 DisconnectUntrustedIncoming {
1095 peer_id: PeerId,
1097 },
1098 DiscoveryBanPeerId {
1100 peer_id: PeerId,
1102 ip_addr: IpAddr,
1104 },
1105 DiscoveryBanIp {
1107 ip_addr: IpAddr,
1109 },
1110 BanPeer {
1112 peer_id: PeerId,
1114 },
1115 UnBanPeer {
1117 peer_id: PeerId,
1119 },
1120 PeerAdded(PeerId),
1122 PeerRemoved(PeerId),
1124}
1125
1126#[derive(Debug, Error, PartialEq, Eq)]
1128pub enum InboundConnectionError {
1129 IpBanned,
1131 ExceedsCapacity,
1133}
1134
1135impl Display for InboundConnectionError {
1136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1137 write!(f, "{self:?}")
1138 }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143 use alloy_primitives::B512;
1144 use reth_eth_wire::{
1145 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1146 DisconnectReason,
1147 };
1148 use reth_net_banlist::BanList;
1149 use reth_network_api::Direction;
1150 use reth_network_peers::{PeerId, TrustedPeer};
1151 use reth_network_types::{
1152 peers::reputation::DEFAULT_REPUTATION, BackoffKind, ReputationChangeKind,
1153 };
1154 use std::{
1155 future::{poll_fn, Future},
1156 io,
1157 net::{IpAddr, Ipv4Addr, SocketAddr},
1158 pin::Pin,
1159 task::{Context, Poll},
1160 time::Duration,
1161 };
1162 use url::Host;
1163
1164 use super::PeersManager;
1165 use crate::{
1166 error::SessionError,
1167 peers::{
1168 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1169 PeerConnectionState,
1170 },
1171 session::PendingSessionHandshakeError,
1172 PeersConfig,
1173 };
1174
1175 struct PeerActionFuture<'a> {
1176 peers: &'a mut PeersManager,
1177 }
1178
1179 impl Future for PeerActionFuture<'_> {
1180 type Output = PeerAction;
1181
1182 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1183 self.get_mut().peers.poll(cx)
1184 }
1185 }
1186
1187 macro_rules! event {
1188 ($peers:expr) => {
1189 PeerActionFuture { peers: &mut $peers }.await
1190 };
1191 }
1192
1193 #[tokio::test]
1194 async fn test_insert() {
1195 let peer = PeerId::random();
1196 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1197 let mut peers = PeersManager::default();
1198 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1199
1200 match event!(peers) {
1201 PeerAction::PeerAdded(peer_id) => {
1202 assert_eq!(peer_id, peer);
1203 }
1204 _ => unreachable!(),
1205 }
1206 match event!(peers) {
1207 PeerAction::Connect { peer_id, remote_addr } => {
1208 assert_eq!(peer_id, peer);
1209 assert_eq!(remote_addr, socket_addr);
1210 }
1211 _ => unreachable!(),
1212 }
1213
1214 let (record, _) = peers.peer_by_id(peer).unwrap();
1215 assert_eq!(record.tcp_addr(), socket_addr);
1216 assert_eq!(record.udp_addr(), socket_addr);
1217 }
1218
1219 #[tokio::test]
1220 async fn test_insert_udp() {
1221 let peer = PeerId::random();
1222 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1223 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1224 let mut peers = PeersManager::default();
1225 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1226
1227 match event!(peers) {
1228 PeerAction::PeerAdded(peer_id) => {
1229 assert_eq!(peer_id, peer);
1230 }
1231 _ => unreachable!(),
1232 }
1233 match event!(peers) {
1234 PeerAction::Connect { peer_id, remote_addr } => {
1235 assert_eq!(peer_id, peer);
1236 assert_eq!(remote_addr, tcp_addr);
1237 }
1238 _ => unreachable!(),
1239 }
1240
1241 let (record, _) = peers.peer_by_id(peer).unwrap();
1242 assert_eq!(record.tcp_addr(), tcp_addr);
1243 assert_eq!(record.udp_addr(), udp_addr);
1244 }
1245
1246 #[tokio::test]
1247 async fn test_ban() {
1248 let peer = PeerId::random();
1249 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1250 let mut peers = PeersManager::default();
1251 peers.ban_peer(peer);
1252 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1253
1254 match event!(peers) {
1255 PeerAction::BanPeer { peer_id } => {
1256 assert_eq!(peer_id, peer);
1257 }
1258 _ => unreachable!(),
1259 }
1260
1261 poll_fn(|cx| {
1262 assert!(peers.poll(cx).is_pending());
1263 Poll::Ready(())
1264 })
1265 .await;
1266 }
1267
1268 #[tokio::test]
1269 async fn test_unban() {
1270 let peer = PeerId::random();
1271 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1272 let mut peers = PeersManager::default();
1273 peers.ban_peer(peer);
1274 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1275
1276 match event!(peers) {
1277 PeerAction::BanPeer { peer_id } => {
1278 assert_eq!(peer_id, peer);
1279 }
1280 _ => unreachable!(),
1281 }
1282
1283 poll_fn(|cx| {
1284 assert!(peers.poll(cx).is_pending());
1285 Poll::Ready(())
1286 })
1287 .await;
1288
1289 peers.unban_peer(peer);
1290
1291 match event!(peers) {
1292 PeerAction::UnBanPeer { peer_id } => {
1293 assert_eq!(peer_id, peer);
1294 }
1295 _ => unreachable!(),
1296 }
1297
1298 poll_fn(|cx| {
1299 assert!(peers.poll(cx).is_pending());
1300 Poll::Ready(())
1301 })
1302 .await;
1303 }
1304
1305 #[tokio::test]
1306 async fn test_backoff_on_busy() {
1307 let peer = PeerId::random();
1308 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1309
1310 let mut peers = PeersManager::new(PeersConfig::test());
1311 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1312
1313 match event!(peers) {
1314 PeerAction::PeerAdded(peer_id) => {
1315 assert_eq!(peer_id, peer);
1316 }
1317 _ => unreachable!(),
1318 }
1319 match event!(peers) {
1320 PeerAction::Connect { peer_id, .. } => {
1321 assert_eq!(peer_id, peer);
1322 }
1323 _ => unreachable!(),
1324 }
1325
1326 poll_fn(|cx| {
1327 assert!(peers.poll(cx).is_pending());
1328 Poll::Ready(())
1329 })
1330 .await;
1331
1332 peers.on_active_session_dropped(
1333 &socket_addr,
1334 &peer,
1335 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1336 DisconnectReason::TooManyPeers,
1337 )),
1338 );
1339
1340 poll_fn(|cx| {
1341 assert!(peers.poll(cx).is_pending());
1342 Poll::Ready(())
1343 })
1344 .await;
1345
1346 assert!(peers.backed_off_peers.contains_key(&peer));
1347 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1348
1349 tokio::time::sleep(peers.backoff_durations.low).await;
1350
1351 match event!(peers) {
1352 PeerAction::Connect { peer_id, .. } => {
1353 assert_eq!(peer_id, peer);
1354 }
1355 _ => unreachable!(),
1356 }
1357
1358 assert!(!peers.backed_off_peers.contains_key(&peer));
1359 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1360 }
1361
1362 #[tokio::test]
1363 async fn test_backoff_on_no_response() {
1364 let peer = PeerId::random();
1365 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1366
1367 let backoff_durations = PeerBackoffDurations::test();
1368 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1369 let mut peers = PeersManager::new(config);
1370 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1371
1372 match event!(peers) {
1373 PeerAction::PeerAdded(peer_id) => {
1374 assert_eq!(peer_id, peer);
1375 }
1376 _ => unreachable!(),
1377 }
1378 match event!(peers) {
1379 PeerAction::Connect { peer_id, .. } => {
1380 assert_eq!(peer_id, peer);
1381 }
1382 _ => unreachable!(),
1383 }
1384
1385 poll_fn(|cx| {
1386 assert!(peers.poll(cx).is_pending());
1387 Poll::Ready(())
1388 })
1389 .await;
1390
1391 peers.on_outgoing_pending_session_dropped(
1392 &socket_addr,
1393 &peer,
1394 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1395 EthHandshakeError::NoResponse,
1396 )),
1397 );
1398
1399 poll_fn(|cx| {
1400 assert!(peers.poll(cx).is_pending());
1401 Poll::Ready(())
1402 })
1403 .await;
1404
1405 assert!(peers.backed_off_peers.contains_key(&peer));
1406 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1407
1408 tokio::time::sleep(backoff_durations.high).await;
1409
1410 match event!(peers) {
1411 PeerAction::Connect { peer_id, .. } => {
1412 assert_eq!(peer_id, peer);
1413 }
1414 _ => unreachable!(),
1415 }
1416
1417 assert!(!peers.backed_off_peers.contains_key(&peer));
1418 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1419 }
1420
1421 #[tokio::test]
1422 async fn test_low_backoff() {
1423 let peer = PeerId::random();
1424 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1425 let config = PeersConfig::test();
1426 let mut peers = PeersManager::new(config);
1427 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1428 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1429
1430 let backoff_timestamp = peers
1431 .backoff_durations
1432 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1433
1434 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1435 assert!(backoff_timestamp <= expected);
1436 }
1437
1438 #[tokio::test]
1439 async fn test_multiple_backoff_calculations() {
1440 let peer = PeerId::random();
1441 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1442 let config = PeersConfig::default();
1443 let mut peers = PeersManager::new(config);
1444 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1445 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1446
1447 peer_struct.severe_backoff_counter = 1;
1449
1450 let now = std::time::Instant::now();
1451
1452 peer_struct.severe_backoff_counter += 1;
1454 let backoff_time = peers
1456 .backoff_durations
1457 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1458
1459 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1461
1462 assert!(backoff_time.duration_since(now) > backoff_duration);
1465 }
1466
1467 #[tokio::test]
1468 async fn test_ban_on_active_drop() {
1469 let peer = PeerId::random();
1470 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1471 let mut peers = PeersManager::default();
1472 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1473
1474 match event!(peers) {
1475 PeerAction::PeerAdded(peer_id) => {
1476 assert_eq!(peer_id, peer);
1477 }
1478 _ => unreachable!(),
1479 }
1480 match event!(peers) {
1481 PeerAction::Connect { peer_id, .. } => {
1482 assert_eq!(peer_id, peer);
1483 }
1484 _ => unreachable!(),
1485 }
1486
1487 poll_fn(|cx| {
1488 assert!(peers.poll(cx).is_pending());
1489 Poll::Ready(())
1490 })
1491 .await;
1492
1493 peers.on_active_session_dropped(
1494 &socket_addr,
1495 &peer,
1496 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1497 DisconnectReason::UselessPeer,
1498 )),
1499 );
1500
1501 match event!(peers) {
1502 PeerAction::PeerRemoved(peer_id) => {
1503 assert_eq!(peer_id, peer);
1504 }
1505 _ => unreachable!(),
1506 }
1507 match event!(peers) {
1508 PeerAction::BanPeer { peer_id } => {
1509 assert_eq!(peer_id, peer);
1510 }
1511 _ => unreachable!(),
1512 }
1513
1514 poll_fn(|cx| {
1515 assert!(peers.poll(cx).is_pending());
1516 Poll::Ready(())
1517 })
1518 .await;
1519
1520 assert!(!peers.peers.contains_key(&peer));
1521 }
1522
1523 #[tokio::test]
1524 async fn test_remove_on_max_backoff_count() {
1525 let peer = PeerId::random();
1526 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1527 let config = PeersConfig::test();
1528 let mut peers = PeersManager::new(config.clone());
1529 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1530 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1531
1532 peer_struct.severe_backoff_counter = config.max_backoff_count;
1534
1535 match event!(peers) {
1536 PeerAction::PeerAdded(peer_id) => {
1537 assert_eq!(peer_id, peer);
1538 }
1539 _ => unreachable!(),
1540 }
1541 match event!(peers) {
1542 PeerAction::Connect { peer_id, .. } => {
1543 assert_eq!(peer_id, peer);
1544 }
1545 _ => unreachable!(),
1546 }
1547
1548 poll_fn(|cx| {
1549 assert!(peers.poll(cx).is_pending());
1550 Poll::Ready(())
1551 })
1552 .await;
1553
1554 peers.on_outgoing_pending_session_dropped(
1555 &socket_addr,
1556 &peer,
1557 &PendingSessionHandshakeError::Eth(
1558 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1559 ),
1560 );
1561
1562 match event!(peers) {
1563 PeerAction::PeerRemoved(peer_id) => {
1564 assert_eq!(peer_id, peer);
1565 }
1566 _ => unreachable!(),
1567 }
1568
1569 poll_fn(|cx| {
1570 assert!(peers.poll(cx).is_pending());
1571 Poll::Ready(())
1572 })
1573 .await;
1574
1575 assert!(!peers.peers.contains_key(&peer));
1576 }
1577
1578 #[tokio::test]
1579 async fn test_ban_on_pending_drop() {
1580 let peer = PeerId::random();
1581 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1582 let mut peers = PeersManager::default();
1583 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1584
1585 match event!(peers) {
1586 PeerAction::PeerAdded(peer_id) => {
1587 assert_eq!(peer_id, peer);
1588 }
1589 _ => unreachable!(),
1590 }
1591 match event!(peers) {
1592 PeerAction::Connect { peer_id, .. } => {
1593 assert_eq!(peer_id, peer);
1594 }
1595 _ => unreachable!(),
1596 }
1597
1598 poll_fn(|cx| {
1599 assert!(peers.poll(cx).is_pending());
1600 Poll::Ready(())
1601 })
1602 .await;
1603
1604 peers.on_outgoing_pending_session_dropped(
1605 &socket_addr,
1606 &peer,
1607 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1608 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1609 )),
1610 );
1611
1612 match event!(peers) {
1613 PeerAction::PeerRemoved(peer_id) => {
1614 assert_eq!(peer_id, peer);
1615 }
1616 _ => unreachable!(),
1617 }
1618 match event!(peers) {
1619 PeerAction::BanPeer { peer_id } => {
1620 assert_eq!(peer_id, peer);
1621 }
1622 _ => unreachable!(),
1623 }
1624
1625 poll_fn(|cx| {
1626 assert!(peers.poll(cx).is_pending());
1627 Poll::Ready(())
1628 })
1629 .await;
1630
1631 assert!(!peers.peers.contains_key(&peer));
1632 }
1633
1634 #[tokio::test]
1635 async fn test_internally_closed_incoming() {
1636 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1637 let mut peers = PeersManager::default();
1638
1639 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1640 assert_eq!(peers.connection_info.num_pending_in, 1);
1641 peers.on_incoming_pending_session_rejected_internally();
1642 assert_eq!(peers.connection_info.num_pending_in, 0);
1643 }
1644
1645 #[tokio::test]
1646 async fn test_reject_incoming_at_pending_capacity() {
1647 let mut peers = PeersManager::default();
1648
1649 for count in 1..=peers.connection_info.config.max_inbound {
1650 let socket_addr =
1651 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1652 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1653 assert_eq!(peers.connection_info.num_pending_in, count);
1654 }
1655 assert!(peers.connection_info.has_in_capacity());
1656 assert!(!peers.connection_info.has_in_pending_capacity());
1657
1658 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1659 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1660 }
1661
1662 #[tokio::test]
1663 async fn test_closed_incoming() {
1664 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1665 let mut peers = PeersManager::default();
1666
1667 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1668 assert_eq!(peers.connection_info.num_pending_in, 1);
1669 peers.on_incoming_pending_session_gracefully_closed();
1670 assert_eq!(peers.connection_info.num_pending_in, 0);
1671 }
1672
1673 #[tokio::test]
1674 async fn test_dropped_incoming() {
1675 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1676 let ban_duration = Duration::from_millis(500);
1677 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1678 let mut peers = PeersManager::new(config);
1679
1680 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1681 assert_eq!(peers.connection_info.num_pending_in, 1);
1682 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1683 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1684 DisconnectReason::UselessPeer,
1685 )),
1686 ));
1687
1688 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1689 assert_eq!(peers.connection_info.num_pending_in, 0);
1690 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1691
1692 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1693
1694 tokio::time::sleep(ban_duration).await;
1696
1697 poll_fn(|cx| {
1698 let _ = peers.poll(cx);
1699 Poll::Ready(())
1700 })
1701 .await;
1702
1703 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1704 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1705 }
1706
1707 #[tokio::test]
1708 async fn test_reputation_change_connected() {
1709 let peer = PeerId::random();
1710 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1711 let mut peers = PeersManager::default();
1712 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1713
1714 match event!(peers) {
1715 PeerAction::PeerAdded(peer_id) => {
1716 assert_eq!(peer_id, peer);
1717 }
1718 _ => unreachable!(),
1719 }
1720 match event!(peers) {
1721 PeerAction::Connect { peer_id, remote_addr } => {
1722 assert_eq!(peer_id, peer);
1723 assert_eq!(remote_addr, socket_addr);
1724 }
1725 _ => unreachable!(),
1726 }
1727
1728 let p = peers.peers.get_mut(&peer).unwrap();
1729 assert_eq!(p.state, PeerConnectionState::PendingOut);
1730
1731 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1732
1733 let p = peers.peers.get(&peer).unwrap();
1734 assert_eq!(p.state, PeerConnectionState::PendingOut);
1735 assert!(p.is_banned());
1736
1737 peers.on_active_session_gracefully_closed(peer);
1738
1739 let p = peers.peers.get(&peer).unwrap();
1740 assert_eq!(p.state, PeerConnectionState::Idle);
1741 assert!(p.is_banned());
1742
1743 match event!(peers) {
1744 PeerAction::Disconnect { peer_id, .. } => {
1745 assert_eq!(peer_id, peer);
1746 }
1747 _ => unreachable!(),
1748 }
1749 }
1750
1751 #[tokio::test]
1752 async fn accept_incoming_trusted_unknown_peer_address() {
1753 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1754 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1755 let trusted = PeerId::random();
1757 peers.add_trusted_peer_id(trusted);
1758
1759 for i in 0..peers.connection_info.config.max_inbound {
1761 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1762 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1763 let peer_id = PeerId::random();
1764 peers.on_incoming_session_established(peer_id, addr);
1765
1766 match event!(peers) {
1767 PeerAction::PeerAdded(id) => {
1768 assert_eq!(id, peer_id);
1769 }
1770 _ => unreachable!(),
1771 }
1772 }
1773
1774 let untrusted = PeerId::random();
1776 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1777 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1778 peers.on_incoming_session_established(untrusted, socket_addr);
1779
1780 match event!(peers) {
1781 PeerAction::PeerAdded(id) => {
1782 assert_eq!(id, untrusted);
1783 }
1784 _ => unreachable!(),
1785 }
1786
1787 match event!(peers) {
1788 PeerAction::Disconnect { peer_id, reason } => {
1789 assert_eq!(peer_id, untrusted);
1790 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1791 }
1792 _ => unreachable!(),
1793 }
1794
1795 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1796 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1797 peers.on_incoming_session_established(trusted, socket_addr);
1798
1799 match event!(peers) {
1800 PeerAction::PeerAdded(id) => {
1801 assert_eq!(id, trusted);
1802 }
1803 _ => unreachable!(),
1804 }
1805
1806 poll_fn(|cx| {
1807 assert!(peers.poll(cx).is_pending());
1808 Poll::Ready(())
1809 })
1810 .await;
1811
1812 let peer = peers.peers.get(&trusted).unwrap();
1813 assert_eq!(peer.state, PeerConnectionState::In);
1814 }
1815
1816 #[tokio::test]
1817 async fn test_already_connected() {
1818 let peer = PeerId::random();
1819 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1820 let mut peers = PeersManager::default();
1821
1822 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1824 assert_eq!(peers.connection_info.num_pending_in, 1);
1825
1826 peers.on_incoming_session_established(peer, socket_addr);
1829 let p = peers.peers.get_mut(&peer).expect("peer not found");
1830 assert_eq!(p.addr.tcp(), socket_addr);
1831 assert_eq!(peers.connection_info.num_pending_in, 0);
1832 assert_eq!(peers.connection_info.num_inbound, 1);
1833
1834 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1837 assert_eq!(peers.connection_info.num_pending_in, 1);
1838
1839 peers.on_already_connected(Direction::Incoming);
1843
1844 let p = peers.peers.get_mut(&peer).expect("peer not found");
1845 assert_eq!(p.addr.tcp(), socket_addr);
1846 assert_eq!(peers.connection_info.num_pending_in, 0);
1847 assert_eq!(peers.connection_info.num_inbound, 1);
1848 }
1849
1850 #[tokio::test]
1851 async fn test_reputation_change_trusted_peer() {
1852 let peer = PeerId::random();
1853 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1854 let mut peers = PeersManager::default();
1855 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
1856
1857 match event!(peers) {
1858 PeerAction::PeerAdded(peer_id) => {
1859 assert_eq!(peer_id, peer);
1860 }
1861 _ => unreachable!(),
1862 }
1863 match event!(peers) {
1864 PeerAction::Connect { peer_id, remote_addr } => {
1865 assert_eq!(peer_id, peer);
1866 assert_eq!(remote_addr, socket_addr);
1867 }
1868 _ => unreachable!(),
1869 }
1870
1871 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
1872 peers.on_active_outgoing_established(peer);
1873 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
1874
1875 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
1876
1877 {
1878 let p = peers.peers.get(&peer).unwrap();
1879 assert_eq!(p.state, PeerConnectionState::Out);
1880 assert!(!p.is_banned());
1882 }
1883
1884 loop {
1886 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
1887
1888 let p = peers.peers.get(&peer).unwrap();
1889 if p.is_banned() {
1890 break
1891 }
1892 }
1893
1894 match event!(peers) {
1895 PeerAction::Disconnect { peer_id, .. } => {
1896 assert_eq!(peer_id, peer);
1897 }
1898 _ => unreachable!(),
1899 }
1900 }
1901
1902 #[tokio::test]
1903 async fn test_reputation_management() {
1904 let peer = PeerId::random();
1905 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1906 let mut peers = PeersManager::default();
1907 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1908 assert_eq!(peers.get_reputation(&peer), Some(0));
1909
1910 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
1911 assert_eq!(peers.get_reputation(&peer), Some(1024));
1912
1913 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
1914 assert_eq!(peers.get_reputation(&peer), Some(0));
1915 }
1916
1917 #[tokio::test]
1918 async fn test_remove_discovered_active() {
1919 let peer = PeerId::random();
1920 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1921 let mut peers = PeersManager::default();
1922 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1923
1924 match event!(peers) {
1925 PeerAction::PeerAdded(peer_id) => {
1926 assert_eq!(peer_id, peer);
1927 }
1928 _ => unreachable!(),
1929 }
1930 match event!(peers) {
1931 PeerAction::Connect { peer_id, remote_addr } => {
1932 assert_eq!(peer_id, peer);
1933 assert_eq!(remote_addr, socket_addr);
1934 }
1935 _ => unreachable!(),
1936 }
1937
1938 let p = peers.peers.get(&peer).unwrap();
1939 assert_eq!(p.state, PeerConnectionState::PendingOut);
1940
1941 peers.remove_peer(peer);
1942
1943 match event!(peers) {
1944 PeerAction::PeerRemoved(peer_id) => {
1945 assert_eq!(peer_id, peer);
1946 }
1947 _ => unreachable!(),
1948 }
1949 match event!(peers) {
1950 PeerAction::Disconnect { peer_id, .. } => {
1951 assert_eq!(peer_id, peer);
1952 }
1953 _ => unreachable!(),
1954 }
1955
1956 let p = peers.peers.get(&peer).unwrap();
1957 assert_eq!(p.state, PeerConnectionState::PendingOut);
1958
1959 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1960 let p = peers.peers.get(&peer).unwrap();
1961 assert_eq!(p.state, PeerConnectionState::PendingOut);
1962
1963 peers.on_active_session_gracefully_closed(peer);
1964 assert!(!peers.peers.contains_key(&peer));
1965 }
1966
1967 #[tokio::test]
1968 async fn test_fatal_outgoing_connection_error_trusted() {
1969 let peer = PeerId::random();
1970 let config = PeersConfig::test()
1971 .with_trusted_nodes(vec![TrustedPeer {
1972 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
1973 tcp_port: 8008,
1974 udp_port: 8008,
1975 id: peer,
1976 }])
1977 .with_trusted_nodes_only(true);
1978 let mut peers = PeersManager::new(config);
1979 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
1980
1981 match event!(peers) {
1982 PeerAction::Connect { peer_id, remote_addr } => {
1983 assert_eq!(peer_id, peer);
1984 assert_eq!(remote_addr, socket_addr);
1985 }
1986 _ => unreachable!(),
1987 }
1988
1989 let p = peers.peers.get(&peer).unwrap();
1990 assert_eq!(p.state, PeerConnectionState::PendingOut);
1991
1992 assert_eq!(peers.num_outbound_connections(), 0);
1993
1994 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1995 EthHandshakeError::NonStatusMessageInHandshake,
1996 ));
1997 assert!(err.is_fatal_protocol_error());
1998
1999 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2000 assert_eq!(peers.num_outbound_connections(), 0);
2001
2002 match event!(peers) {
2004 PeerAction::BanPeer { peer_id } => {
2005 assert_eq!(peer_id, peer);
2006 }
2007 err => unreachable!("{err:?}"),
2008 }
2009
2010 assert!(peers.peers.contains_key(&peer));
2012
2013 tokio::time::sleep(peers.backoff_durations.medium).await;
2015
2016 match event!(peers) {
2017 PeerAction::Connect { peer_id, remote_addr } => {
2018 assert_eq!(peer_id, peer);
2019 assert_eq!(remote_addr, socket_addr);
2020 }
2021 err => unreachable!("{err:?}"),
2022 }
2023 }
2024
2025 #[tokio::test]
2026 async fn test_outgoing_connection_error() {
2027 let peer = PeerId::random();
2028 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2029 let mut peers = PeersManager::default();
2030 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2031
2032 match event!(peers) {
2033 PeerAction::PeerAdded(peer_id) => {
2034 assert_eq!(peer_id, peer);
2035 }
2036 _ => unreachable!(),
2037 }
2038 match event!(peers) {
2039 PeerAction::Connect { peer_id, remote_addr } => {
2040 assert_eq!(peer_id, peer);
2041 assert_eq!(remote_addr, socket_addr);
2042 }
2043 _ => unreachable!(),
2044 }
2045
2046 let p = peers.peers.get(&peer).unwrap();
2047 assert_eq!(p.state, PeerConnectionState::PendingOut);
2048
2049 assert_eq!(peers.num_outbound_connections(), 0);
2050
2051 peers.on_outgoing_connection_failure(
2052 &socket_addr,
2053 &peer,
2054 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2055 );
2056
2057 assert_eq!(peers.num_outbound_connections(), 0);
2058 }
2059
2060 #[tokio::test]
2061 async fn test_outgoing_connection_gracefully_closed() {
2062 let peer = PeerId::random();
2063 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2064 let mut peers = PeersManager::default();
2065 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2066
2067 match event!(peers) {
2068 PeerAction::PeerAdded(peer_id) => {
2069 assert_eq!(peer_id, peer);
2070 }
2071 _ => unreachable!(),
2072 }
2073 match event!(peers) {
2074 PeerAction::Connect { peer_id, remote_addr } => {
2075 assert_eq!(peer_id, peer);
2076 assert_eq!(remote_addr, socket_addr);
2077 }
2078 _ => unreachable!(),
2079 }
2080
2081 let p = peers.peers.get(&peer).unwrap();
2082 assert_eq!(p.state, PeerConnectionState::PendingOut);
2083
2084 assert_eq!(peers.num_outbound_connections(), 0);
2085
2086 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2087
2088 assert_eq!(peers.num_outbound_connections(), 0);
2089 assert_eq!(peers.connection_info.num_pending_out, 0);
2090 }
2091
2092 #[tokio::test]
2093 async fn test_discovery_ban_list() {
2094 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2095 let socket_addr = SocketAddr::new(ip, 8008);
2096 let ban_list = BanList::new(vec![], vec![ip]);
2097 let config = PeersConfig::default().with_ban_list(ban_list);
2098 let mut peer_manager = PeersManager::new(config);
2099 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2100
2101 assert!(peer_manager.peers.is_empty());
2102 }
2103
2104 #[tokio::test]
2105 async fn test_on_pending_ban_list() {
2106 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2107 let socket_addr = SocketAddr::new(ip, 8008);
2108 let ban_list = BanList::new(vec![], vec![ip]);
2109 let config = PeersConfig::test().with_ban_list(ban_list);
2110 let mut peer_manager = PeersManager::new(config);
2111 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2112 match a {
2114 Ok(_) => panic!(),
2115 Err(err) => match err {
2116 InboundConnectionError::IpBanned {} => {
2117 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2118 }
2119 _ => unreachable!(),
2120 },
2121 }
2122 }
2123
2124 #[tokio::test]
2125 async fn test_on_active_inbound_ban_list() {
2126 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2127 let socket_addr = SocketAddr::new(ip, 8008);
2128 let given_peer_id = PeerId::random();
2129 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2130 let config = PeersConfig::test().with_ban_list(ban_list);
2131 let mut peer_manager = PeersManager::new(config);
2132 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2133 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2135 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2136 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2139 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2140
2141 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2142 peer_manager.queued_actions.pop_front()
2143 else {
2144 panic!()
2145 };
2146
2147 assert_eq!(peer_id, given_peer_id)
2148 }
2149
2150 #[test]
2151 fn test_connection_limits() {
2152 let mut info = ConnectionInfo::default();
2153 info.inc_in();
2154 assert_eq!(info.num_inbound, 1);
2155 assert_eq!(info.num_outbound, 0);
2156 assert!(info.has_in_capacity());
2157
2158 info.decr_in();
2159 assert_eq!(info.num_inbound, 0);
2160 assert_eq!(info.num_outbound, 0);
2161
2162 info.inc_out();
2163 assert_eq!(info.num_inbound, 0);
2164 assert_eq!(info.num_outbound, 1);
2165 assert!(info.has_out_capacity());
2166
2167 info.decr_out();
2168 assert_eq!(info.num_inbound, 0);
2169 assert_eq!(info.num_outbound, 0);
2170 }
2171
2172 #[test]
2173 fn test_connection_peer_state() {
2174 let mut info = ConnectionInfo::default();
2175 info.inc_in();
2176
2177 info.decr_state(PeerConnectionState::In);
2178 assert_eq!(info.num_inbound, 0);
2179 assert_eq!(info.num_outbound, 0);
2180
2181 info.inc_out();
2182
2183 info.decr_state(PeerConnectionState::Out);
2184 assert_eq!(info.num_inbound, 0);
2185 assert_eq!(info.num_outbound, 0);
2186 }
2187
2188 #[tokio::test]
2189 async fn test_trusted_peers_are_prioritized() {
2190 let trusted_peer = PeerId::random();
2191 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2192 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2193 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2194 tcp_port: 8008,
2195 udp_port: 8008,
2196 id: trusted_peer,
2197 }]);
2198 let mut peers = PeersManager::new(config);
2199
2200 let basic_peer = PeerId::random();
2201 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2202 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2203
2204 match event!(peers) {
2205 PeerAction::PeerAdded(peer_id) => {
2206 assert_eq!(peer_id, basic_peer);
2207 }
2208 _ => unreachable!(),
2209 }
2210 match event!(peers) {
2211 PeerAction::Connect { peer_id, remote_addr } => {
2212 assert_eq!(peer_id, trusted_peer);
2213 assert_eq!(remote_addr, trusted_sock);
2214 }
2215 _ => unreachable!(),
2216 }
2217 match event!(peers) {
2218 PeerAction::Connect { peer_id, remote_addr } => {
2219 assert_eq!(peer_id, basic_peer);
2220 assert_eq!(remote_addr, basic_sock);
2221 }
2222 _ => unreachable!(),
2223 }
2224 }
2225
2226 #[tokio::test]
2227 async fn test_connect_trusted_nodes_only() {
2228 let trusted_peer = PeerId::random();
2229 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2230 let config = PeersConfig::test()
2231 .with_trusted_nodes(vec![TrustedPeer {
2232 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2233 tcp_port: 8008,
2234 udp_port: 8008,
2235 id: trusted_peer,
2236 }])
2237 .with_trusted_nodes_only(true);
2238 let mut peers = PeersManager::new(config);
2239
2240 let basic_peer = PeerId::random();
2241 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2242 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2243
2244 match event!(peers) {
2245 PeerAction::PeerAdded(peer_id) => {
2246 assert_eq!(peer_id, basic_peer);
2247 }
2248 _ => unreachable!(),
2249 }
2250 match event!(peers) {
2251 PeerAction::Connect { peer_id, remote_addr } => {
2252 assert_eq!(peer_id, trusted_peer);
2253 assert_eq!(remote_addr, trusted_sock);
2254 }
2255 _ => unreachable!(),
2256 }
2257 poll_fn(|cx| {
2258 assert!(peers.poll(cx).is_pending());
2259 Poll::Ready(())
2260 })
2261 .await;
2262 }
2263
2264 #[tokio::test]
2265 async fn test_incoming_with_trusted_nodes_only() {
2266 let trusted_peer = PeerId::random();
2267 let config = PeersConfig::test()
2268 .with_trusted_nodes(vec![TrustedPeer {
2269 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2270 tcp_port: 8008,
2271 udp_port: 8008,
2272 id: trusted_peer,
2273 }])
2274 .with_trusted_nodes_only(true);
2275 let mut peers = PeersManager::new(config);
2276
2277 let basic_peer = PeerId::random();
2278 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2279 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2280 assert_eq!(peers.connection_info.num_pending_in, 1);
2282 peers.on_incoming_session_established(basic_peer, basic_sock);
2283 assert_eq!(peers.connection_info.num_pending_in, 0);
2286 assert_eq!(peers.connection_info.num_inbound, 0);
2287
2288 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2289 peers.queued_actions.pop_front()
2290 else {
2291 panic!()
2292 };
2293 assert_eq!(basic_peer, peer_id);
2294 assert!(!peers.peers.contains_key(&basic_peer));
2295 }
2296
2297 #[tokio::test]
2298 async fn test_incoming_without_trusted_nodes_only() {
2299 let trusted_peer = PeerId::random();
2300 let config = PeersConfig::test()
2301 .with_trusted_nodes(vec![TrustedPeer {
2302 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2303 tcp_port: 8008,
2304 udp_port: 8008,
2305 id: trusted_peer,
2306 }])
2307 .with_trusted_nodes_only(false);
2308 let mut peers = PeersManager::new(config);
2309
2310 let basic_peer = PeerId::random();
2311 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2312 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2313
2314 assert_eq!(peers.connection_info.num_pending_in, 1);
2316 peers.on_incoming_session_established(basic_peer, basic_sock);
2317 assert_eq!(peers.connection_info.num_pending_in, 0);
2320 assert_eq!(peers.connection_info.num_inbound, 1);
2321 assert!(peers.peers.contains_key(&basic_peer));
2322 }
2323
2324 #[tokio::test]
2325 async fn test_incoming_at_capacity() {
2326 let mut config = PeersConfig::test();
2327 config.connection_info.max_inbound = 1;
2328 let mut peers = PeersManager::new(config);
2329
2330 let peer = PeerId::random();
2331 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2332 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2333
2334 peers.on_incoming_session_established(peer, addr);
2335
2336 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2337 assert_eq!(
2338 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2339 InboundConnectionError::ExceedsCapacity
2340 );
2341 }
2342
2343 #[tokio::test]
2344 async fn test_incoming_rate_limit() {
2345 let config = PeersConfig {
2346 incoming_ip_throttle_duration: Duration::from_millis(100),
2347 ..PeersConfig::test()
2348 };
2349 let mut peers = PeersManager::new(config);
2350
2351 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2352 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2353 assert_eq!(
2354 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2355 InboundConnectionError::IpBanned
2356 );
2357
2358 peers.release_interval.reset_immediately();
2359 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2360
2361 poll_fn(|cx| loop {
2363 if peers.poll(cx).is_pending() {
2364 return Poll::Ready(());
2365 }
2366 })
2367 .await;
2368
2369 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2370 assert_eq!(
2371 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2372 InboundConnectionError::IpBanned
2373 );
2374 }
2375
2376 #[tokio::test]
2377 async fn test_tick() {
2378 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2379 let socket_addr = SocketAddr::new(ip, 8008);
2380 let config = PeersConfig::test();
2381 let mut peer_manager = PeersManager::new(config);
2382 let peer_id = PeerId::random();
2383 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2384
2385 tokio::time::sleep(Duration::from_secs(1)).await;
2386 peer_manager.tick();
2387
2388 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2390
2391 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2393
2394 tokio::time::sleep(Duration::from_secs(1)).await;
2395 peer_manager.tick();
2396
2397 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2399
2400 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2401
2402 tokio::time::sleep(Duration::from_secs(1)).await;
2403 peer_manager.tick();
2404
2405 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2407 }
2408
2409 #[tokio::test]
2410 async fn test_remove_incoming_after_disconnect() {
2411 let peer_id = PeerId::random();
2412 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2413 let mut peers = PeersManager::default();
2414
2415 peers.on_incoming_pending_session(addr.ip()).unwrap();
2416 peers.on_incoming_session_established(peer_id, addr);
2417 let peer = peers.peers.get(&peer_id).unwrap();
2418 assert_eq!(peer.state, PeerConnectionState::In);
2419 assert!(peer.remove_after_disconnect);
2420
2421 peers.on_active_session_gracefully_closed(peer_id);
2422 assert!(!peers.peers.contains_key(&peer_id))
2423 }
2424
2425 #[tokio::test]
2426 async fn test_keep_incoming_after_disconnect_if_discovered() {
2427 let peer_id = PeerId::random();
2428 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2429 let mut peers = PeersManager::default();
2430
2431 peers.on_incoming_pending_session(addr.ip()).unwrap();
2432 peers.on_incoming_session_established(peer_id, addr);
2433 let peer = peers.peers.get(&peer_id).unwrap();
2434 assert_eq!(peer.state, PeerConnectionState::In);
2435 assert!(peer.remove_after_disconnect);
2436
2437 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2439
2440 peers.on_active_session_gracefully_closed(peer_id);
2441
2442 let peer = peers.peers.get(&peer_id).unwrap();
2443 assert_eq!(peer.state, PeerConnectionState::Idle);
2444 assert!(!peer.remove_after_disconnect);
2445 }
2446
2447 #[tokio::test]
2448 async fn test_incoming_outgoing_already_connected() {
2449 let peer_id = PeerId::random();
2450 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2451 let mut peers = PeersManager::default();
2452
2453 peers.on_incoming_pending_session(addr.ip()).unwrap();
2454 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2455
2456 match event!(peers) {
2457 PeerAction::PeerAdded(_) => {}
2458 _ => unreachable!(),
2459 }
2460
2461 match event!(peers) {
2462 PeerAction::Connect { .. } => {}
2463 _ => unreachable!(),
2464 }
2465
2466 peers.on_incoming_session_established(peer_id, addr);
2467 peers.on_already_connected(Direction::Outgoing(peer_id));
2468 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2469 assert_eq!(peers.connection_info.num_inbound, 1);
2470 assert_eq!(peers.connection_info.num_pending_out, 0);
2471 assert_eq!(peers.connection_info.num_pending_in, 0);
2472 assert_eq!(peers.connection_info.num_outbound, 0);
2473 }
2474
2475 #[tokio::test]
2476 async fn test_already_connected_incoming_outgoing_connection_error() {
2477 let peer_id = PeerId::random();
2478 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2479 let mut peers = PeersManager::default();
2480
2481 peers.on_incoming_pending_session(addr.ip()).unwrap();
2482 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2483
2484 match event!(peers) {
2485 PeerAction::PeerAdded(_) => {}
2486 _ => unreachable!(),
2487 }
2488
2489 match event!(peers) {
2490 PeerAction::Connect { .. } => {}
2491 _ => unreachable!(),
2492 }
2493
2494 peers.on_incoming_session_established(peer_id, addr);
2495
2496 peers.on_outgoing_connection_failure(
2497 &addr,
2498 &peer_id,
2499 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2500 );
2501 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2502 assert_eq!(peers.connection_info.num_inbound, 1);
2503 assert_eq!(peers.connection_info.num_pending_out, 0);
2504 assert_eq!(peers.connection_info.num_pending_in, 0);
2505 assert_eq!(peers.connection_info.num_outbound, 0);
2506 }
2507
2508 #[tokio::test]
2509 async fn test_max_concurrent_dials() {
2510 let config = PeersConfig::default();
2511 let mut peer_manager = PeersManager::new(config);
2512 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2513 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2514 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2515 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2516 }
2517
2518 peer_manager.fill_outbound_slots();
2519 let dials = peer_manager
2520 .queued_actions
2521 .iter()
2522 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2523 .count();
2524 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2525 }
2526
2527 #[tokio::test]
2528 async fn test_max_num_of_pending_dials() {
2529 let config = PeersConfig::default();
2530 let mut peer_manager = PeersManager::new(config);
2531 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2532 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2533
2534 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2536 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2537 }
2538
2539 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2540 match event!(peer_manager) {
2541 PeerAction::PeerAdded(_) => {}
2542 _ => unreachable!(),
2543 }
2544 }
2545
2546 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2547 match event!(peer_manager) {
2548 PeerAction::Connect { .. } => {}
2549 _ => unreachable!(),
2550 }
2551 }
2552
2553 peer_manager.fill_outbound_slots();
2555
2556 let dials = peer_manager.connection_info.num_pending_out;
2558 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2559
2560 let num_pendingout_states = peer_manager
2561 .peers
2562 .iter()
2563 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2564 .map(|(peer_id, _)| *peer_id)
2565 .collect::<Vec<PeerId>>();
2566 assert_eq!(
2567 num_pendingout_states.len(),
2568 peer_manager.connection_info.config.max_concurrent_outbound_dials
2569 );
2570
2571 for peer_id in &num_pendingout_states {
2573 peer_manager.on_active_outgoing_established(*peer_id);
2574 }
2575
2576 for peer_id in &num_pendingout_states {
2578 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2579 }
2580
2581 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2583 }
2584
2585 #[tokio::test]
2586 async fn test_connect() {
2587 let peer = PeerId::random();
2588 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2589 let mut peers = PeersManager::default();
2590 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2591 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2592
2593 match event!(peers) {
2594 PeerAction::Connect { peer_id, remote_addr } => {
2595 assert_eq!(peer_id, peer);
2596 assert_eq!(remote_addr, socket_addr);
2597 }
2598 _ => unreachable!(),
2599 }
2600
2601 let (record, _) = peers.peer_by_id(peer).unwrap();
2602 assert_eq!(record.tcp_addr(), socket_addr);
2603 assert_eq!(record.udp_addr(), socket_addr);
2604
2605 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2607
2608 let (record, _) = peers.peer_by_id(peer).unwrap();
2609 assert_eq!(record.tcp_addr(), socket_addr);
2610 assert_eq!(record.udp_addr(), socket_addr);
2611 }
2612}