reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7};
8use futures::StreamExt;
9use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
10use reth_ethereum_forks::ForkId;
11use reth_net_banlist::BanList;
12use reth_network_api::test_utils::{PeerCommand, PeersHandle};
13use reth_network_peers::{NodeRecord, PeerId};
14use reth_network_types::{
15    peers::{
16        config::PeerBackoffDurations,
17        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
18    },
19    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
20    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
21};
22use std::{
23    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
24    fmt::Display,
25    io::{self},
26    net::{IpAddr, SocketAddr},
27    task::{Context, Poll},
28    time::Duration,
29};
30use thiserror::Error;
31use tokio::{
32    sync::mpsc,
33    time::{Instant, Interval},
34};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tracing::{trace, warn};
37
38/// Maintains the state of _all_ the peers known to the network.
39///
40/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
41/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
42///
43/// The [`PeersManager`] will be notified on peer related changes
44#[derive(Debug)]
45pub struct PeersManager {
46    /// All peers known to the network
47    peers: HashMap<PeerId, Peer>,
48    /// The set of trusted peer ids.
49    ///
50    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
51    /// an address: [`Self::add_trusted_peer_id`]
52    trusted_peer_ids: HashSet<PeerId>,
53    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
54    manager_tx: mpsc::UnboundedSender<PeerCommand>,
55    /// Receiver half of the command channel.
56    handle_rx: UnboundedReceiverStream<PeerCommand>,
57    /// Buffered actions until the manager is polled.
58    queued_actions: VecDeque<PeerAction>,
59    /// Interval for triggering connections if there are free slots.
60    refill_slots_interval: Interval,
61    /// How to weigh reputation changes
62    reputation_weights: ReputationChangeWeights,
63    /// Tracks current slot stats.
64    connection_info: ConnectionInfo,
65    /// Tracks unwanted ips/peer ids.
66    ban_list: BanList,
67    /// Tracks currently backed off peers.
68    backed_off_peers: HashMap<PeerId, std::time::Instant>,
69    /// Interval at which to check for peers to unban and release from the backoff map.
70    release_interval: Interval,
71    /// How long to ban bad peers.
72    ban_duration: Duration,
73    /// How long peers to which we could not connect for non-fatal reasons, e.g.
74    /// [`DisconnectReason::TooManyPeers`], are put in time out.
75    backoff_durations: PeerBackoffDurations,
76    /// If non-trusted peers should be connected to, or the connection from non-trusted
77    /// incoming peers should be accepted.
78    trusted_nodes_only: bool,
79    /// Timestamp of the last time [`Self::tick`] was called.
80    last_tick: Instant,
81    /// Maximum number of backoff attempts before we give up on a peer and dropping.
82    max_backoff_count: u8,
83    /// Tracks the connection state of the node
84    net_connection_state: NetworkConnectionState,
85    /// How long to temporarily ban ip on an incoming connection attempt.
86    incoming_ip_throttle_duration: Duration,
87}
88
89impl PeersManager {
90    /// Create a new instance with the given config
91    pub fn new(config: PeersConfig) -> Self {
92        let PeersConfig {
93            refill_slots_interval,
94            connection_info,
95            reputation_weights,
96            ban_list,
97            ban_duration,
98            backoff_durations,
99            trusted_nodes,
100            trusted_nodes_only,
101            basic_nodes,
102            max_backoff_count,
103            incoming_ip_throttle_duration,
104        } = config;
105        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
106        let now = Instant::now();
107
108        // We use half of the interval to decrease the max duration to `150%` in worst case
109        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
110
111        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
112        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
113
114        for trusted_peer in trusted_nodes {
115            match trusted_peer.resolve_blocking() {
116                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
117                    trusted_peer_ids.insert(id);
118                    peers.entry(id).or_insert_with(|| {
119                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
120                    });
121                }
122                Err(err) => {
123                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
124                }
125            }
126        }
127
128        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
129            peers.entry(id).or_insert_with(|| {
130                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
131            });
132        }
133
134        Self {
135            peers,
136            trusted_peer_ids,
137            manager_tx,
138            handle_rx: UnboundedReceiverStream::new(handle_rx),
139            queued_actions: Default::default(),
140            reputation_weights,
141            refill_slots_interval: tokio::time::interval(refill_slots_interval),
142            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
143            connection_info: ConnectionInfo::new(connection_info),
144            ban_list,
145            backed_off_peers: Default::default(),
146            ban_duration,
147            backoff_durations,
148            trusted_nodes_only,
149            last_tick: Instant::now(),
150            max_backoff_count,
151            net_connection_state: NetworkConnectionState::default(),
152            incoming_ip_throttle_duration,
153        }
154    }
155
156    /// Returns a new [`PeersHandle`] that can send commands to this type.
157    pub(crate) fn handle(&self) -> PeersHandle {
158        PeersHandle::new(self.manager_tx.clone())
159    }
160
161    /// Returns the number of peers in the peer set
162    #[inline]
163    pub(crate) fn num_known_peers(&self) -> usize {
164        self.peers.len()
165    }
166
167    /// Returns an iterator over all peers
168    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
169        self.peers.iter().map(|(peer_id, v)| {
170            NodeRecord::new_with_ports(
171                v.addr.tcp().ip(),
172                v.addr.tcp().port(),
173                v.addr.udp().map(|addr| addr.port()),
174                *peer_id,
175            )
176        })
177    }
178
179    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
180    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
181        self.peers.get(&peer_id).map(|v| {
182            (
183                NodeRecord::new_with_ports(
184                    v.addr.tcp().ip(),
185                    v.addr.tcp().port(),
186                    v.addr.udp().map(|addr| addr.port()),
187                    peer_id,
188                ),
189                v.kind,
190            )
191        })
192    }
193
194    /// Returns an iterator over all peer ids for peers with the given kind
195    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
196        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
197    }
198
199    /// Returns the number of currently active inbound connections.
200    #[inline]
201    pub(crate) const fn num_inbound_connections(&self) -> usize {
202        self.connection_info.num_inbound
203    }
204
205    /// Returns the number of currently __active__ outbound connections.
206    #[inline]
207    pub(crate) const fn num_outbound_connections(&self) -> usize {
208        self.connection_info.num_outbound
209    }
210
211    /// Returns the number of currently pending outbound connections.
212    #[inline]
213    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
214        self.connection_info.num_pending_out
215    }
216
217    /// Returns the number of currently backed off peers.
218    #[inline]
219    pub(crate) fn num_backed_off_peers(&self) -> usize {
220        self.backed_off_peers.len()
221    }
222
223    /// Returns the number of idle trusted peers.
224    fn num_idle_trusted_peers(&self) -> usize {
225        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
226    }
227
228    /// Invoked when a new _incoming_ tcp connection is accepted.
229    ///
230    /// returns an error if the inbound ip address is on the ban list
231    pub(crate) fn on_incoming_pending_session(
232        &mut self,
233        addr: IpAddr,
234    ) -> Result<(), InboundConnectionError> {
235        if self.ban_list.is_banned_ip(&addr) {
236            return Err(InboundConnectionError::IpBanned)
237        }
238
239        // check if we even have slots for a new incoming connection
240        if !self.connection_info.has_in_capacity() {
241            if self.trusted_peer_ids.is_empty() {
242                // if we don't have any incoming slots and no trusted peers, we don't accept any new
243                // connections
244                return Err(InboundConnectionError::ExceedsCapacity)
245            }
246
247            // there's an edge case here where no incoming connections besides from trusted peers
248            // are allowed (max_inbound == 0), in which case we still need to allow new pending
249            // incoming connections until all trusted peers are connected.
250            let num_idle_trusted_peers = self.num_idle_trusted_peers();
251            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
252                // we still want to limit concurrent pending connections
253                let max_inbound =
254                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
255                if self.connection_info.num_pending_in <= max_inbound {
256                    self.connection_info.inc_pending_in();
257                }
258                return Ok(())
259            }
260
261            // all trusted peers are either connected or connecting
262            return Err(InboundConnectionError::ExceedsCapacity)
263        }
264
265        // also cap the incoming connections we can process at once
266        if !self.connection_info.has_in_pending_capacity() {
267            return Err(InboundConnectionError::ExceedsCapacity)
268        }
269
270        // apply the rate limit
271        self.throttle_incoming_ip(addr);
272
273        self.connection_info.inc_pending_in();
274        Ok(())
275    }
276
277    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
278    /// rejected.
279    pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) {
280        self.connection_info.decr_pending_in();
281    }
282
283    /// Invoked when a pending session was closed.
284    pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) {
285        self.connection_info.decr_pending_in()
286    }
287
288    /// Invoked when a pending session was closed.
289    pub(crate) fn on_incoming_pending_session_dropped(
290        &mut self,
291        remote_addr: SocketAddr,
292        err: &PendingSessionHandshakeError,
293    ) {
294        if err.is_fatal_protocol_error() {
295            self.ban_ip(remote_addr.ip());
296
297            if err.merits_discovery_ban() {
298                self.queued_actions
299                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
300            }
301        }
302
303        self.connection_info.decr_pending_in();
304    }
305
306    /// Called when a new _incoming_ active session was established to the given peer.
307    ///
308    /// This will update the state of the peer if not yet tracked.
309    ///
310    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
311    /// be scheduled.
312    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
313        self.connection_info.decr_pending_in();
314
315        // we only need to check the peer id here as the ip address will have been checked at
316        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
317        if self.ban_list.is_banned_peer(&peer_id) {
318            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
319            return
320        }
321
322        // check if the peer is trustable or not
323        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
324        if self.trusted_nodes_only && !is_trusted {
325            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
326            return
327        }
328
329        // start a new tick, so the peer is not immediately rewarded for the time since last tick
330        self.tick();
331
332        let has_in_capacity = self.connection_info.has_in_capacity();
333        self.connection_info.inc_in();
334
335        match self.peers.entry(peer_id) {
336            Entry::Occupied(mut entry) => {
337                let peer = entry.get_mut();
338                if peer.is_banned() {
339                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
340                    return
341                }
342                // it might be the case that we're also trying to connect to this peer at the same
343                // time, so we need to adjust the state here
344                if peer.state.is_pending_out() {
345                    self.connection_info.decr_state(peer.state);
346                }
347
348                peer.state = PeerConnectionState::In;
349
350                is_trusted = is_trusted || peer.is_trusted();
351            }
352            Entry::Vacant(entry) => {
353                // peer is missing in the table, we add it but mark it as to be removed after
354                // disconnect, because we only know the outgoing port
355                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
356                peer.remove_after_disconnect = true;
357                entry.insert(peer);
358                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
359            }
360        }
361
362        // disconnect the peer if we don't have capacity for more inbound connections
363        if !is_trusted && !has_in_capacity {
364            self.queued_actions.push_back(PeerAction::Disconnect {
365                peer_id,
366                reason: Some(DisconnectReason::TooManyPeers),
367            });
368        }
369    }
370
371    /// Bans the peer temporarily with the configured ban timeout
372    fn ban_peer(&mut self, peer_id: PeerId) {
373        let mut ban_duration = self.ban_duration;
374        if let Some(peer) = self.peers.get(&peer_id) {
375            if peer.is_trusted() || peer.is_static() {
376                // For misbehaving trusted or static peers, we provide a bit more leeway when
377                // penalizing them.
378                ban_duration = self.backoff_durations.low / 2;
379            }
380        }
381
382        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
383        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
384    }
385
386    /// Bans the IP temporarily with the configured ban timeout
387    fn ban_ip(&mut self, ip: IpAddr) {
388        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
389    }
390
391    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
392    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
393        self.ban_list
394            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
395    }
396
397    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
398    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
399        trace!(target: "net::peers", ?peer_id, "backing off");
400
401        if let Some(peer) = self.peers.get_mut(&peer_id) {
402            peer.backed_off = true;
403            self.backed_off_peers.insert(peer_id, until);
404        }
405    }
406
407    /// Unbans the peer
408    fn unban_peer(&mut self, peer_id: PeerId) {
409        self.ban_list.unban_peer(&peer_id);
410        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
411    }
412
413    /// Tick function to update reputation of all connected peers.
414    /// Peers are rewarded with reputation increases for the time they are connected since the last
415    /// tick. This is to prevent peers from being disconnected eventually due to slashed
416    /// reputation because of some bad messages (most likely transaction related)
417    fn tick(&mut self) {
418        let now = Instant::now();
419        // Determine the number of seconds since the last tick.
420        // Ensuring that now is always greater than last_tick to account for issues with system
421        // time.
422        let secs_since_last_tick =
423            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
424        self.last_tick = now;
425
426        // update reputation via seconds connected
427        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
428            // update reputation via seconds connected, but keep the target _around_ the default
429            // reputation.
430            if peer.1.reputation < DEFAULT_REPUTATION {
431                peer.1.reputation += secs_since_last_tick;
432            }
433        }
434    }
435
436    /// Returns the tracked reputation for a peer.
437    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
438        self.peers.get(peer_id).map(|peer| peer.reputation)
439    }
440
441    /// Apply the corresponding reputation change to the given peer.
442    ///
443    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
444    /// reputation changes that can be attributed to network conditions. If the peer is a
445    /// trusted peer, it will also be less strict with the reputation slashing.
446    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
447        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
448            // First check if we should reset the reputation
449            if rep.is_reset() {
450                peer.reset_reputation()
451            } else {
452                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
453                if peer.is_trusted() || peer.is_static() {
454                    // exempt trusted and static peers from reputation slashing for
455                    if matches!(
456                        rep,
457                        ReputationChangeKind::Dropped |
458                            ReputationChangeKind::BadAnnouncement |
459                            ReputationChangeKind::Timeout |
460                            ReputationChangeKind::AlreadySeenTransaction
461                    ) {
462                        return
463                    }
464
465                    // also be less strict with the reputation slashing for trusted peers
466                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
467                        // this caps the reputation change to the maximum allowed for trusted peers
468                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
469                    }
470                }
471                peer.apply_reputation(reputation_change)
472            }
473        } else {
474            return
475        };
476
477        match outcome {
478            ReputationChangeOutcome::None => {}
479            ReputationChangeOutcome::Ban => {
480                self.ban_peer(*peer_id);
481            }
482            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
483            ReputationChangeOutcome::DisconnectAndBan => {
484                self.queued_actions.push_back(PeerAction::Disconnect {
485                    peer_id: *peer_id,
486                    reason: Some(DisconnectReason::DisconnectRequested),
487                });
488                self.ban_peer(*peer_id);
489            }
490        }
491    }
492
493    /// Gracefully disconnected a pending _outgoing_ session
494    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
495        if let Some(peer) = self.peers.get_mut(peer_id) {
496            self.connection_info.decr_state(peer.state);
497            peer.state = PeerConnectionState::Idle;
498        }
499    }
500
501    /// Invoked when an _outgoing_ pending session was closed during authentication or the
502    /// handshake.
503    pub(crate) fn on_outgoing_pending_session_dropped(
504        &mut self,
505        remote_addr: &SocketAddr,
506        peer_id: &PeerId,
507        err: &PendingSessionHandshakeError,
508    ) {
509        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
510    }
511
512    /// Gracefully disconnected an active session
513    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
514        match self.peers.entry(peer_id) {
515            Entry::Occupied(mut entry) => {
516                self.connection_info.decr_state(entry.get().state);
517
518                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
519                    // this peer should be removed from the set
520                    entry.remove();
521                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
522                } else {
523                    // reset the peer's state
524                    // we reset the backoff counter since we're able to establish a successful
525                    // session to that peer
526                    entry.get_mut().severe_backoff_counter = 0;
527                    entry.get_mut().state = PeerConnectionState::Idle;
528                    return
529                }
530            }
531            Entry::Vacant(_) => return,
532        }
533
534        self.fill_outbound_slots();
535    }
536
537    /// Called when a _pending_ outbound connection is successful.
538    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
539        if let Some(peer) = self.peers.get_mut(&peer_id) {
540            self.connection_info.decr_state(peer.state);
541            self.connection_info.inc_out();
542            peer.state = PeerConnectionState::Out;
543        }
544    }
545
546    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
547    ///
548    /// Depending on whether the error is fatal, the peer will be removed from the peer set
549    /// otherwise its reputation is slashed.
550    pub(crate) fn on_active_session_dropped(
551        &mut self,
552        remote_addr: &SocketAddr,
553        peer_id: &PeerId,
554        err: &EthStreamError,
555    ) {
556        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
557    }
558
559    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
560    /// connection.
561    pub(crate) fn on_outgoing_connection_failure(
562        &mut self,
563        remote_addr: &SocketAddr,
564        peer_id: &PeerId,
565        err: &io::Error,
566    ) {
567        // there's a race condition where we accepted an incoming connection while we were trying to
568        // connect to the same peer at the same time. if the outgoing connection failed
569        // after the incoming connection was accepted, we can ignore this error
570        if let Some(peer) = self.peers.get(peer_id) {
571            if peer.state.is_incoming() {
572                // we already have an active connection to the peer, so we can ignore this error
573                return
574            }
575        }
576
577        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
578    }
579
580    fn on_connection_failure(
581        &mut self,
582        remote_addr: &SocketAddr,
583        peer_id: &PeerId,
584        err: impl SessionError,
585        reputation_change: ReputationChangeKind,
586    ) {
587        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
588
589        if err.is_fatal_protocol_error() {
590            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
591            // remove the peer to which we can't establish a connection due to protocol related
592            // issues.
593            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
594                self.connection_info.decr_state(entry.get().state);
595                // only remove if the peer is not trusted
596                if entry.get().is_trusted() {
597                    entry.get_mut().state = PeerConnectionState::Idle;
598                } else {
599                    entry.remove();
600                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
601                    // If the error is caused by a peer that should be banned from discovery
602                    if err.merits_discovery_ban() {
603                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
604                            peer_id: *peer_id,
605                            ip_addr: remote_addr.ip(),
606                        })
607                    }
608                }
609            }
610
611            // ban the peer
612            self.ban_peer(*peer_id);
613        } else {
614            let mut backoff_until = None;
615            let mut remove_peer = false;
616
617            if let Some(peer) = self.peers.get_mut(peer_id) {
618                if let Some(kind) = err.should_backoff() {
619                    // Increment peer.backoff_counter
620                    if kind.is_severe() {
621                        peer.severe_backoff_counter = peer.severe_backoff_counter.saturating_add(1);
622                    }
623
624                    let backoff_time =
625                        self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
626
627                    // The peer has signaled that it is currently unable to process any more
628                    // connections, so we will hold off on attempting any new connections for a
629                    // while
630                    backoff_until = Some(backoff_time);
631                } else {
632                    // If the error was not a backoff error, we reduce the peer's reputation
633                    let reputation_change = self.reputation_weights.change(reputation_change);
634                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
635                };
636
637                self.connection_info.decr_state(peer.state);
638                peer.state = PeerConnectionState::Idle;
639
640                if peer.severe_backoff_counter > self.max_backoff_count && !peer.is_trusted() {
641                    // mark peer for removal if it has been backoff too many times and is _not_
642                    // trusted
643                    remove_peer = true;
644                }
645            }
646
647            // remove peer if it has been marked for removal
648            if remove_peer {
649                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
650                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
651            } else if let Some(backoff_until) = backoff_until {
652                // otherwise, backoff the peer if marked as such
653                self.backoff_peer_until(*peer_id, backoff_until);
654            }
655        }
656
657        self.fill_outbound_slots();
658    }
659
660    /// Invoked if a pending session was disconnected because there's already a connection to the
661    /// peer.
662    ///
663    /// If the session was an outgoing connection, this means that the peer initiated a connection
664    /// to us at the same time and this connection is already established.
665    pub(crate) fn on_already_connected(&mut self, direction: Direction) {
666        match direction {
667            Direction::Incoming => {
668                // need to decrement the ingoing counter
669                self.connection_info.decr_pending_in();
670            }
671            Direction::Outgoing(_) => {
672                // cleanup is handled when the incoming active session is activated in
673                // `on_incoming_session_established`
674            }
675        }
676    }
677
678    /// Called as follow-up for a discovered peer.
679    ///
680    /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
681    /// protocol
682    pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
683        if let Some(peer) = self.peers.get_mut(&peer_id) {
684            trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
685            peer.fork_id = Some(fork_id);
686        }
687    }
688
689    /// Called for a newly discovered peer.
690    ///
691    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
692    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
693        self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
694    }
695
696    /// Marks the given peer as trusted.
697    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
698        self.trusted_peer_ids.insert(peer_id);
699    }
700
701    /// Called for a newly discovered trusted peer.
702    ///
703    /// If the peer already exists, then the address and kind will be updated.
704    #[allow(dead_code)]
705    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
706        self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
707    }
708
709    /// Called for a newly discovered peer.
710    ///
711    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
712    pub(crate) fn add_peer_kind(
713        &mut self,
714        peer_id: PeerId,
715        kind: PeerKind,
716        addr: PeerAddr,
717        fork_id: Option<ForkId>,
718    ) {
719        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
720            return
721        }
722
723        match self.peers.entry(peer_id) {
724            Entry::Occupied(mut entry) => {
725                let peer = entry.get_mut();
726                peer.kind = kind;
727                peer.fork_id = fork_id;
728                peer.addr = addr;
729
730                if peer.state.is_incoming() {
731                    // now that we have an actual discovered address, for that peer and not just the
732                    // ip of the incoming connection, we don't need to remove the peer after
733                    // disconnecting, See `on_incoming_session_established`
734                    peer.remove_after_disconnect = false;
735                }
736            }
737            Entry::Vacant(entry) => {
738                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
739                let mut peer = Peer::with_kind(addr, kind);
740                peer.fork_id = fork_id;
741                entry.insert(peer);
742                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
743            }
744        }
745
746        if kind.is_trusted() {
747            self.trusted_peer_ids.insert(peer_id);
748        }
749    }
750
751    /// Removes the tracked node from the set.
752    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
753        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
754        if entry.get().is_trusted() {
755            return
756        }
757        let mut peer = entry.remove();
758
759        trace!(target: "net::peers", ?peer_id, "remove discovered node");
760        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
761
762        if peer.state.is_connected() {
763            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
764            // we terminate the active session here, but only remove the peer after the session
765            // was disconnected, this prevents the case where the session is scheduled for
766            // disconnect but the node is immediately rediscovered, See also
767            // [`Self::on_disconnected()`]
768            peer.remove_after_disconnect = true;
769            peer.state.disconnect();
770            self.peers.insert(peer_id, peer);
771            self.queued_actions.push_back(PeerAction::Disconnect {
772                peer_id,
773                reason: Some(DisconnectReason::DisconnectRequested),
774            })
775        }
776    }
777
778    /// Connect to the given peer. NOTE: if the maximum number out outbound sessions is reached,
779    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
780    #[allow(dead_code)]
781    pub(crate) fn add_and_connect(
782        &mut self,
783        peer_id: PeerId,
784        addr: PeerAddr,
785        fork_id: Option<ForkId>,
786    ) {
787        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
788    }
789
790    ///  Connects a peer and its address with the given kind.
791    pub(crate) fn add_and_connect_kind(
792        &mut self,
793        peer_id: PeerId,
794        kind: PeerKind,
795        addr: PeerAddr,
796        fork_id: Option<ForkId>,
797    ) {
798        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
799            return
800        }
801
802        match self.peers.entry(peer_id) {
803            Entry::Vacant(entry) => {
804                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
805                let mut peer = Peer::with_kind(addr, kind);
806                peer.state = PeerConnectionState::PendingOut;
807                peer.fork_id = fork_id;
808                entry.insert(peer);
809                self.queued_actions
810                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
811            }
812            _ => return,
813        }
814
815        if kind.is_trusted() {
816            self.trusted_peer_ids.insert(peer_id);
817        }
818    }
819
820    /// Removes the tracked node from the trusted set.
821    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
822        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
823        if !entry.get().is_trusted() {
824            return
825        }
826
827        let peer = entry.get_mut();
828        peer.kind = PeerKind::Basic;
829
830        self.trusted_peer_ids.remove(&peer_id);
831    }
832
833    /// Returns the idle peer with the highest reputation.
834    ///
835    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
836    /// not currently marked as banned or backed off.
837    ///
838    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
839    /// `trusted` peers.
840    ///
841    /// Returns `None` if no peer is available.
842    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
843        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
844            !peer.is_backed_off() &&
845                !peer.is_banned() &&
846                peer.state.is_unconnected() &&
847                (!self.trusted_nodes_only || peer.is_trusted())
848        });
849
850        // keep track of the best peer, if there's one
851        let mut best_peer = unconnected.next()?;
852
853        if best_peer.1.is_trusted() || best_peer.1.is_static() {
854            return Some((*best_peer.0, best_peer.1))
855        }
856
857        for maybe_better in unconnected {
858            // if the peer is trusted or static, return it immediately
859            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
860                return Some((*maybe_better.0, maybe_better.1))
861            }
862
863            // otherwise we keep track of the best peer using the reputation
864            if maybe_better.1.reputation > best_peer.1.reputation {
865                best_peer = maybe_better;
866            }
867        }
868        Some((*best_peer.0, best_peer.1))
869    }
870
871    /// If there's capacity for new outbound connections, this will queue new
872    /// [`PeerAction::Connect`] actions.
873    ///
874    /// New connections are only initiated, if slots are available and appropriate peers are
875    /// available.
876    fn fill_outbound_slots(&mut self) {
877        self.tick();
878
879        if !self.net_connection_state.is_active() {
880            // nothing to fill
881            return
882        }
883
884        // as long as there are slots available fill them with the best peers
885        while self.connection_info.has_out_capacity() {
886            let action = {
887                let (peer_id, peer) = match self.best_unconnected() {
888                    Some(peer) => peer,
889                    _ => break,
890                };
891
892                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
893
894                peer.state = PeerConnectionState::PendingOut;
895                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
896            };
897
898            self.connection_info.inc_pending_out();
899
900            self.queued_actions.push_back(action);
901        }
902    }
903
904    /// Keeps track of network state changes.
905    pub fn on_network_state_change(&mut self, state: NetworkConnectionState) {
906        self.net_connection_state = state;
907    }
908
909    /// Returns the current network connection state.
910    pub const fn connection_state(&self) -> &NetworkConnectionState {
911        &self.net_connection_state
912    }
913
914    /// Sets `net_connection_state` to `ShuttingDown`.
915    pub fn on_shutdown(&mut self) {
916        self.net_connection_state = NetworkConnectionState::ShuttingDown;
917    }
918
919    /// Advances the state.
920    ///
921    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
922    /// [`PeersManager`] is polled.
923    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
924        loop {
925            // drain buffered actions
926            if let Some(action) = self.queued_actions.pop_front() {
927                return Poll::Ready(action)
928            }
929
930            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
931                match cmd {
932                    PeerCommand::Add(peer_id, addr) => {
933                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
934                    }
935                    PeerCommand::Remove(peer) => self.remove_peer(peer),
936                    PeerCommand::ReputationChange(peer_id, rep) => {
937                        self.apply_reputation_change(&peer_id, rep)
938                    }
939                    PeerCommand::GetPeer(peer, tx) => {
940                        let _ = tx.send(self.peers.get(&peer).cloned());
941                    }
942                    PeerCommand::GetPeers(tx) => {
943                        let _ = tx.send(self.iter_peers().collect());
944                    }
945                }
946            }
947
948            if self.release_interval.poll_tick(cx).is_ready() {
949                let now = std::time::Instant::now();
950                let (_, unbanned_peers) = self.ban_list.evict(now);
951
952                for peer_id in unbanned_peers {
953                    if let Some(peer) = self.peers.get_mut(&peer_id) {
954                        peer.unban();
955                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
956                    }
957                }
958
959                // clear the backoff list of expired backoffs, and mark the relevant peers as
960                // ready to be dialed
961                self.backed_off_peers.retain(|peer_id, until| {
962                    if now > *until {
963                        if let Some(peer) = self.peers.get_mut(peer_id) {
964                            peer.backed_off = false;
965                        }
966                        return false
967                    }
968                    true
969                })
970            }
971
972            while self.refill_slots_interval.poll_tick(cx).is_ready() {
973                self.fill_outbound_slots();
974            }
975
976            if self.queued_actions.is_empty() {
977                return Poll::Pending
978            }
979        }
980    }
981}
982
983impl Default for PeersManager {
984    fn default() -> Self {
985        Self::new(Default::default())
986    }
987}
988
989/// Tracks stats about connected nodes
990#[derive(Debug, Clone, PartialEq, Eq, Default)]
991pub struct ConnectionInfo {
992    /// Counter for currently occupied slots for active outbound connections.
993    num_outbound: usize,
994    /// Counter for pending outbound connections.
995    num_pending_out: usize,
996    /// Counter for currently occupied slots for active inbound connections.
997    num_inbound: usize,
998    /// Counter for pending inbound connections.
999    num_pending_in: usize,
1000    /// Restrictions on number of connections.
1001    config: ConnectionsConfig,
1002}
1003
1004// === impl ConnectionInfo ===
1005
1006impl ConnectionInfo {
1007    /// Returns a new [`ConnectionInfo`] with the given config.
1008    const fn new(config: ConnectionsConfig) -> Self {
1009        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1010    }
1011
1012    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1013    const fn has_out_capacity(&self) -> bool {
1014        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1015            self.num_outbound < self.config.max_outbound
1016    }
1017
1018    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1019    const fn has_in_capacity(&self) -> bool {
1020        self.num_inbound < self.config.max_inbound
1021    }
1022
1023    /// Returns `true` if we can handle an additional incoming pending connection.
1024    const fn has_in_pending_capacity(&self) -> bool {
1025        self.num_pending_in < self.config.max_inbound
1026    }
1027
1028    fn decr_state(&mut self, state: PeerConnectionState) {
1029        match state {
1030            PeerConnectionState::Idle => {}
1031            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1032            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1033            PeerConnectionState::PendingOut => self.decr_pending_out(),
1034        }
1035    }
1036
1037    fn decr_out(&mut self) {
1038        self.num_outbound -= 1;
1039    }
1040
1041    fn inc_out(&mut self) {
1042        self.num_outbound += 1;
1043    }
1044
1045    fn inc_pending_out(&mut self) {
1046        self.num_pending_out += 1;
1047    }
1048
1049    fn inc_in(&mut self) {
1050        self.num_inbound += 1;
1051    }
1052
1053    fn inc_pending_in(&mut self) {
1054        self.num_pending_in += 1;
1055    }
1056
1057    fn decr_in(&mut self) {
1058        self.num_inbound -= 1;
1059    }
1060
1061    fn decr_pending_out(&mut self) {
1062        self.num_pending_out -= 1;
1063    }
1064
1065    fn decr_pending_in(&mut self) {
1066        self.num_pending_in -= 1;
1067    }
1068}
1069
1070/// Actions the peer manager can trigger.
1071#[derive(Debug)]
1072pub enum PeerAction {
1073    /// Start a new connection to a peer.
1074    Connect {
1075        /// The peer to connect to.
1076        peer_id: PeerId,
1077        /// Where to reach the node
1078        remote_addr: SocketAddr,
1079    },
1080    /// Disconnect an existing connection.
1081    Disconnect {
1082        /// The peer ID of the established connection.
1083        peer_id: PeerId,
1084        /// An optional reason for the disconnect.
1085        reason: Option<DisconnectReason>,
1086    },
1087    /// Disconnect an existing incoming connection, because the peers reputation is below the
1088    /// banned threshold or is on the [`BanList`]
1089    DisconnectBannedIncoming {
1090        /// The peer ID of the established connection.
1091        peer_id: PeerId,
1092    },
1093    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1094    DisconnectUntrustedIncoming {
1095        /// The peer ID.
1096        peer_id: PeerId,
1097    },
1098    /// Ban the peer in discovery.
1099    DiscoveryBanPeerId {
1100        /// The peer ID.
1101        peer_id: PeerId,
1102        /// The IP address.
1103        ip_addr: IpAddr,
1104    },
1105    /// Ban the IP in discovery.
1106    DiscoveryBanIp {
1107        /// The IP address.
1108        ip_addr: IpAddr,
1109    },
1110    /// Ban the peer temporarily
1111    BanPeer {
1112        /// The peer ID.
1113        peer_id: PeerId,
1114    },
1115    /// Unban the peer temporarily
1116    UnBanPeer {
1117        /// The peer ID.
1118        peer_id: PeerId,
1119    },
1120    /// Emit peerAdded event
1121    PeerAdded(PeerId),
1122    /// Emit peerRemoved event
1123    PeerRemoved(PeerId),
1124}
1125
1126/// Error thrown when a incoming connection is rejected right away
1127#[derive(Debug, Error, PartialEq, Eq)]
1128pub enum InboundConnectionError {
1129    /// The remote's ip address is banned
1130    IpBanned,
1131    /// No capacity for new inbound connections
1132    ExceedsCapacity,
1133}
1134
1135impl Display for InboundConnectionError {
1136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1137        write!(f, "{self:?}")
1138    }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143    use alloy_primitives::B512;
1144    use reth_eth_wire::{
1145        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1146        DisconnectReason,
1147    };
1148    use reth_net_banlist::BanList;
1149    use reth_network_api::Direction;
1150    use reth_network_peers::{PeerId, TrustedPeer};
1151    use reth_network_types::{
1152        peers::reputation::DEFAULT_REPUTATION, BackoffKind, ReputationChangeKind,
1153    };
1154    use std::{
1155        future::{poll_fn, Future},
1156        io,
1157        net::{IpAddr, Ipv4Addr, SocketAddr},
1158        pin::Pin,
1159        task::{Context, Poll},
1160        time::Duration,
1161    };
1162    use url::Host;
1163
1164    use super::PeersManager;
1165    use crate::{
1166        error::SessionError,
1167        peers::{
1168            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1169            PeerConnectionState,
1170        },
1171        session::PendingSessionHandshakeError,
1172        PeersConfig,
1173    };
1174
1175    struct PeerActionFuture<'a> {
1176        peers: &'a mut PeersManager,
1177    }
1178
1179    impl Future for PeerActionFuture<'_> {
1180        type Output = PeerAction;
1181
1182        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1183            self.get_mut().peers.poll(cx)
1184        }
1185    }
1186
1187    macro_rules! event {
1188        ($peers:expr) => {
1189            PeerActionFuture { peers: &mut $peers }.await
1190        };
1191    }
1192
1193    #[tokio::test]
1194    async fn test_insert() {
1195        let peer = PeerId::random();
1196        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1197        let mut peers = PeersManager::default();
1198        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1199
1200        match event!(peers) {
1201            PeerAction::PeerAdded(peer_id) => {
1202                assert_eq!(peer_id, peer);
1203            }
1204            _ => unreachable!(),
1205        }
1206        match event!(peers) {
1207            PeerAction::Connect { peer_id, remote_addr } => {
1208                assert_eq!(peer_id, peer);
1209                assert_eq!(remote_addr, socket_addr);
1210            }
1211            _ => unreachable!(),
1212        }
1213
1214        let (record, _) = peers.peer_by_id(peer).unwrap();
1215        assert_eq!(record.tcp_addr(), socket_addr);
1216        assert_eq!(record.udp_addr(), socket_addr);
1217    }
1218
1219    #[tokio::test]
1220    async fn test_insert_udp() {
1221        let peer = PeerId::random();
1222        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1223        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1224        let mut peers = PeersManager::default();
1225        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1226
1227        match event!(peers) {
1228            PeerAction::PeerAdded(peer_id) => {
1229                assert_eq!(peer_id, peer);
1230            }
1231            _ => unreachable!(),
1232        }
1233        match event!(peers) {
1234            PeerAction::Connect { peer_id, remote_addr } => {
1235                assert_eq!(peer_id, peer);
1236                assert_eq!(remote_addr, tcp_addr);
1237            }
1238            _ => unreachable!(),
1239        }
1240
1241        let (record, _) = peers.peer_by_id(peer).unwrap();
1242        assert_eq!(record.tcp_addr(), tcp_addr);
1243        assert_eq!(record.udp_addr(), udp_addr);
1244    }
1245
1246    #[tokio::test]
1247    async fn test_ban() {
1248        let peer = PeerId::random();
1249        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1250        let mut peers = PeersManager::default();
1251        peers.ban_peer(peer);
1252        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1253
1254        match event!(peers) {
1255            PeerAction::BanPeer { peer_id } => {
1256                assert_eq!(peer_id, peer);
1257            }
1258            _ => unreachable!(),
1259        }
1260
1261        poll_fn(|cx| {
1262            assert!(peers.poll(cx).is_pending());
1263            Poll::Ready(())
1264        })
1265        .await;
1266    }
1267
1268    #[tokio::test]
1269    async fn test_unban() {
1270        let peer = PeerId::random();
1271        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1272        let mut peers = PeersManager::default();
1273        peers.ban_peer(peer);
1274        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1275
1276        match event!(peers) {
1277            PeerAction::BanPeer { peer_id } => {
1278                assert_eq!(peer_id, peer);
1279            }
1280            _ => unreachable!(),
1281        }
1282
1283        poll_fn(|cx| {
1284            assert!(peers.poll(cx).is_pending());
1285            Poll::Ready(())
1286        })
1287        .await;
1288
1289        peers.unban_peer(peer);
1290
1291        match event!(peers) {
1292            PeerAction::UnBanPeer { peer_id } => {
1293                assert_eq!(peer_id, peer);
1294            }
1295            _ => unreachable!(),
1296        }
1297
1298        poll_fn(|cx| {
1299            assert!(peers.poll(cx).is_pending());
1300            Poll::Ready(())
1301        })
1302        .await;
1303    }
1304
1305    #[tokio::test]
1306    async fn test_backoff_on_busy() {
1307        let peer = PeerId::random();
1308        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1309
1310        let mut peers = PeersManager::new(PeersConfig::test());
1311        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1312
1313        match event!(peers) {
1314            PeerAction::PeerAdded(peer_id) => {
1315                assert_eq!(peer_id, peer);
1316            }
1317            _ => unreachable!(),
1318        }
1319        match event!(peers) {
1320            PeerAction::Connect { peer_id, .. } => {
1321                assert_eq!(peer_id, peer);
1322            }
1323            _ => unreachable!(),
1324        }
1325
1326        poll_fn(|cx| {
1327            assert!(peers.poll(cx).is_pending());
1328            Poll::Ready(())
1329        })
1330        .await;
1331
1332        peers.on_active_session_dropped(
1333            &socket_addr,
1334            &peer,
1335            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1336                DisconnectReason::TooManyPeers,
1337            )),
1338        );
1339
1340        poll_fn(|cx| {
1341            assert!(peers.poll(cx).is_pending());
1342            Poll::Ready(())
1343        })
1344        .await;
1345
1346        assert!(peers.backed_off_peers.contains_key(&peer));
1347        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1348
1349        tokio::time::sleep(peers.backoff_durations.low).await;
1350
1351        match event!(peers) {
1352            PeerAction::Connect { peer_id, .. } => {
1353                assert_eq!(peer_id, peer);
1354            }
1355            _ => unreachable!(),
1356        }
1357
1358        assert!(!peers.backed_off_peers.contains_key(&peer));
1359        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1360    }
1361
1362    #[tokio::test]
1363    async fn test_backoff_on_no_response() {
1364        let peer = PeerId::random();
1365        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1366
1367        let backoff_durations = PeerBackoffDurations::test();
1368        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1369        let mut peers = PeersManager::new(config);
1370        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1371
1372        match event!(peers) {
1373            PeerAction::PeerAdded(peer_id) => {
1374                assert_eq!(peer_id, peer);
1375            }
1376            _ => unreachable!(),
1377        }
1378        match event!(peers) {
1379            PeerAction::Connect { peer_id, .. } => {
1380                assert_eq!(peer_id, peer);
1381            }
1382            _ => unreachable!(),
1383        }
1384
1385        poll_fn(|cx| {
1386            assert!(peers.poll(cx).is_pending());
1387            Poll::Ready(())
1388        })
1389        .await;
1390
1391        peers.on_outgoing_pending_session_dropped(
1392            &socket_addr,
1393            &peer,
1394            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1395                EthHandshakeError::NoResponse,
1396            )),
1397        );
1398
1399        poll_fn(|cx| {
1400            assert!(peers.poll(cx).is_pending());
1401            Poll::Ready(())
1402        })
1403        .await;
1404
1405        assert!(peers.backed_off_peers.contains_key(&peer));
1406        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1407
1408        tokio::time::sleep(backoff_durations.high).await;
1409
1410        match event!(peers) {
1411            PeerAction::Connect { peer_id, .. } => {
1412                assert_eq!(peer_id, peer);
1413            }
1414            _ => unreachable!(),
1415        }
1416
1417        assert!(!peers.backed_off_peers.contains_key(&peer));
1418        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1419    }
1420
1421    #[tokio::test]
1422    async fn test_low_backoff() {
1423        let peer = PeerId::random();
1424        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1425        let config = PeersConfig::test();
1426        let mut peers = PeersManager::new(config);
1427        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1428        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1429
1430        let backoff_timestamp = peers
1431            .backoff_durations
1432            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1433
1434        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1435        assert!(backoff_timestamp <= expected);
1436    }
1437
1438    #[tokio::test]
1439    async fn test_multiple_backoff_calculations() {
1440        let peer = PeerId::random();
1441        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1442        let config = PeersConfig::default();
1443        let mut peers = PeersManager::new(config);
1444        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1445        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1446
1447        // Simulate a peer that was already backed off once
1448        peer_struct.severe_backoff_counter = 1;
1449
1450        let now = std::time::Instant::now();
1451
1452        // Simulate the increment that happens in on_connection_failure
1453        peer_struct.severe_backoff_counter += 1;
1454        // Get official backoff time
1455        let backoff_time = peers
1456            .backoff_durations
1457            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1458
1459        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1460        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1461
1462        // We can't use assert_eq! since there is a very small diff in the nano secs
1463        // Usually it is 1800s != 1799.9999996s
1464        assert!(backoff_time.duration_since(now) > backoff_duration);
1465    }
1466
1467    #[tokio::test]
1468    async fn test_ban_on_active_drop() {
1469        let peer = PeerId::random();
1470        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1471        let mut peers = PeersManager::default();
1472        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1473
1474        match event!(peers) {
1475            PeerAction::PeerAdded(peer_id) => {
1476                assert_eq!(peer_id, peer);
1477            }
1478            _ => unreachable!(),
1479        }
1480        match event!(peers) {
1481            PeerAction::Connect { peer_id, .. } => {
1482                assert_eq!(peer_id, peer);
1483            }
1484            _ => unreachable!(),
1485        }
1486
1487        poll_fn(|cx| {
1488            assert!(peers.poll(cx).is_pending());
1489            Poll::Ready(())
1490        })
1491        .await;
1492
1493        peers.on_active_session_dropped(
1494            &socket_addr,
1495            &peer,
1496            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1497                DisconnectReason::UselessPeer,
1498            )),
1499        );
1500
1501        match event!(peers) {
1502            PeerAction::PeerRemoved(peer_id) => {
1503                assert_eq!(peer_id, peer);
1504            }
1505            _ => unreachable!(),
1506        }
1507        match event!(peers) {
1508            PeerAction::BanPeer { peer_id } => {
1509                assert_eq!(peer_id, peer);
1510            }
1511            _ => unreachable!(),
1512        }
1513
1514        poll_fn(|cx| {
1515            assert!(peers.poll(cx).is_pending());
1516            Poll::Ready(())
1517        })
1518        .await;
1519
1520        assert!(!peers.peers.contains_key(&peer));
1521    }
1522
1523    #[tokio::test]
1524    async fn test_remove_on_max_backoff_count() {
1525        let peer = PeerId::random();
1526        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1527        let config = PeersConfig::test();
1528        let mut peers = PeersManager::new(config.clone());
1529        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1530        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1531
1532        // Simulate a peer that was already backed off once
1533        peer_struct.severe_backoff_counter = config.max_backoff_count;
1534
1535        match event!(peers) {
1536            PeerAction::PeerAdded(peer_id) => {
1537                assert_eq!(peer_id, peer);
1538            }
1539            _ => unreachable!(),
1540        }
1541        match event!(peers) {
1542            PeerAction::Connect { peer_id, .. } => {
1543                assert_eq!(peer_id, peer);
1544            }
1545            _ => unreachable!(),
1546        }
1547
1548        poll_fn(|cx| {
1549            assert!(peers.poll(cx).is_pending());
1550            Poll::Ready(())
1551        })
1552        .await;
1553
1554        peers.on_outgoing_pending_session_dropped(
1555            &socket_addr,
1556            &peer,
1557            &PendingSessionHandshakeError::Eth(
1558                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1559            ),
1560        );
1561
1562        match event!(peers) {
1563            PeerAction::PeerRemoved(peer_id) => {
1564                assert_eq!(peer_id, peer);
1565            }
1566            _ => unreachable!(),
1567        }
1568
1569        poll_fn(|cx| {
1570            assert!(peers.poll(cx).is_pending());
1571            Poll::Ready(())
1572        })
1573        .await;
1574
1575        assert!(!peers.peers.contains_key(&peer));
1576    }
1577
1578    #[tokio::test]
1579    async fn test_ban_on_pending_drop() {
1580        let peer = PeerId::random();
1581        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1582        let mut peers = PeersManager::default();
1583        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1584
1585        match event!(peers) {
1586            PeerAction::PeerAdded(peer_id) => {
1587                assert_eq!(peer_id, peer);
1588            }
1589            _ => unreachable!(),
1590        }
1591        match event!(peers) {
1592            PeerAction::Connect { peer_id, .. } => {
1593                assert_eq!(peer_id, peer);
1594            }
1595            _ => unreachable!(),
1596        }
1597
1598        poll_fn(|cx| {
1599            assert!(peers.poll(cx).is_pending());
1600            Poll::Ready(())
1601        })
1602        .await;
1603
1604        peers.on_outgoing_pending_session_dropped(
1605            &socket_addr,
1606            &peer,
1607            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1608                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1609            )),
1610        );
1611
1612        match event!(peers) {
1613            PeerAction::PeerRemoved(peer_id) => {
1614                assert_eq!(peer_id, peer);
1615            }
1616            _ => unreachable!(),
1617        }
1618        match event!(peers) {
1619            PeerAction::BanPeer { peer_id } => {
1620                assert_eq!(peer_id, peer);
1621            }
1622            _ => unreachable!(),
1623        }
1624
1625        poll_fn(|cx| {
1626            assert!(peers.poll(cx).is_pending());
1627            Poll::Ready(())
1628        })
1629        .await;
1630
1631        assert!(!peers.peers.contains_key(&peer));
1632    }
1633
1634    #[tokio::test]
1635    async fn test_internally_closed_incoming() {
1636        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1637        let mut peers = PeersManager::default();
1638
1639        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1640        assert_eq!(peers.connection_info.num_pending_in, 1);
1641        peers.on_incoming_pending_session_rejected_internally();
1642        assert_eq!(peers.connection_info.num_pending_in, 0);
1643    }
1644
1645    #[tokio::test]
1646    async fn test_reject_incoming_at_pending_capacity() {
1647        let mut peers = PeersManager::default();
1648
1649        for count in 1..=peers.connection_info.config.max_inbound {
1650            let socket_addr =
1651                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1652            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1653            assert_eq!(peers.connection_info.num_pending_in, count);
1654        }
1655        assert!(peers.connection_info.has_in_capacity());
1656        assert!(!peers.connection_info.has_in_pending_capacity());
1657
1658        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1659        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1660    }
1661
1662    #[tokio::test]
1663    async fn test_closed_incoming() {
1664        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1665        let mut peers = PeersManager::default();
1666
1667        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1668        assert_eq!(peers.connection_info.num_pending_in, 1);
1669        peers.on_incoming_pending_session_gracefully_closed();
1670        assert_eq!(peers.connection_info.num_pending_in, 0);
1671    }
1672
1673    #[tokio::test]
1674    async fn test_dropped_incoming() {
1675        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1676        let ban_duration = Duration::from_millis(500);
1677        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1678        let mut peers = PeersManager::new(config);
1679
1680        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1681        assert_eq!(peers.connection_info.num_pending_in, 1);
1682        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1683            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1684                DisconnectReason::UselessPeer,
1685            )),
1686        ));
1687
1688        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1689        assert_eq!(peers.connection_info.num_pending_in, 0);
1690        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1691
1692        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1693
1694        // unbanned after timeout
1695        tokio::time::sleep(ban_duration).await;
1696
1697        poll_fn(|cx| {
1698            let _ = peers.poll(cx);
1699            Poll::Ready(())
1700        })
1701        .await;
1702
1703        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1704        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1705    }
1706
1707    #[tokio::test]
1708    async fn test_reputation_change_connected() {
1709        let peer = PeerId::random();
1710        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1711        let mut peers = PeersManager::default();
1712        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1713
1714        match event!(peers) {
1715            PeerAction::PeerAdded(peer_id) => {
1716                assert_eq!(peer_id, peer);
1717            }
1718            _ => unreachable!(),
1719        }
1720        match event!(peers) {
1721            PeerAction::Connect { peer_id, remote_addr } => {
1722                assert_eq!(peer_id, peer);
1723                assert_eq!(remote_addr, socket_addr);
1724            }
1725            _ => unreachable!(),
1726        }
1727
1728        let p = peers.peers.get_mut(&peer).unwrap();
1729        assert_eq!(p.state, PeerConnectionState::PendingOut);
1730
1731        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1732
1733        let p = peers.peers.get(&peer).unwrap();
1734        assert_eq!(p.state, PeerConnectionState::PendingOut);
1735        assert!(p.is_banned());
1736
1737        peers.on_active_session_gracefully_closed(peer);
1738
1739        let p = peers.peers.get(&peer).unwrap();
1740        assert_eq!(p.state, PeerConnectionState::Idle);
1741        assert!(p.is_banned());
1742
1743        match event!(peers) {
1744            PeerAction::Disconnect { peer_id, .. } => {
1745                assert_eq!(peer_id, peer);
1746            }
1747            _ => unreachable!(),
1748        }
1749    }
1750
1751    #[tokio::test]
1752    async fn accept_incoming_trusted_unknown_peer_address() {
1753        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1754        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1755        // try to connect trusted peer
1756        let trusted = PeerId::random();
1757        peers.add_trusted_peer_id(trusted);
1758
1759        // saturate the inbound slots
1760        for i in 0..peers.connection_info.config.max_inbound {
1761            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1762            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1763            let peer_id = PeerId::random();
1764            peers.on_incoming_session_established(peer_id, addr);
1765
1766            match event!(peers) {
1767                PeerAction::PeerAdded(id) => {
1768                    assert_eq!(id, peer_id);
1769                }
1770                _ => unreachable!(),
1771            }
1772        }
1773
1774        // try to connect untrusted peer
1775        let untrusted = PeerId::random();
1776        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1777        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1778        peers.on_incoming_session_established(untrusted, socket_addr);
1779
1780        match event!(peers) {
1781            PeerAction::PeerAdded(id) => {
1782                assert_eq!(id, untrusted);
1783            }
1784            _ => unreachable!(),
1785        }
1786
1787        match event!(peers) {
1788            PeerAction::Disconnect { peer_id, reason } => {
1789                assert_eq!(peer_id, untrusted);
1790                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1791            }
1792            _ => unreachable!(),
1793        }
1794
1795        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1796        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1797        peers.on_incoming_session_established(trusted, socket_addr);
1798
1799        match event!(peers) {
1800            PeerAction::PeerAdded(id) => {
1801                assert_eq!(id, trusted);
1802            }
1803            _ => unreachable!(),
1804        }
1805
1806        poll_fn(|cx| {
1807            assert!(peers.poll(cx).is_pending());
1808            Poll::Ready(())
1809        })
1810        .await;
1811
1812        let peer = peers.peers.get(&trusted).unwrap();
1813        assert_eq!(peer.state, PeerConnectionState::In);
1814    }
1815
1816    #[tokio::test]
1817    async fn test_already_connected() {
1818        let peer = PeerId::random();
1819        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1820        let mut peers = PeersManager::default();
1821
1822        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
1823        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1824        assert_eq!(peers.connection_info.num_pending_in, 1);
1825
1826        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
1827        // to increase by 1
1828        peers.on_incoming_session_established(peer, socket_addr);
1829        let p = peers.peers.get_mut(&peer).expect("peer not found");
1830        assert_eq!(p.addr.tcp(), socket_addr);
1831        assert_eq!(peers.connection_info.num_pending_in, 0);
1832        assert_eq!(peers.connection_info.num_inbound, 1);
1833
1834        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
1835        // by 1
1836        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1837        assert_eq!(peers.connection_info.num_pending_in, 1);
1838
1839        // Simulate a rejection due to an already established connection, expecting the
1840        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
1841        // should not be changed.
1842        peers.on_already_connected(Direction::Incoming);
1843
1844        let p = peers.peers.get_mut(&peer).expect("peer not found");
1845        assert_eq!(p.addr.tcp(), socket_addr);
1846        assert_eq!(peers.connection_info.num_pending_in, 0);
1847        assert_eq!(peers.connection_info.num_inbound, 1);
1848    }
1849
1850    #[tokio::test]
1851    async fn test_reputation_change_trusted_peer() {
1852        let peer = PeerId::random();
1853        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1854        let mut peers = PeersManager::default();
1855        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
1856
1857        match event!(peers) {
1858            PeerAction::PeerAdded(peer_id) => {
1859                assert_eq!(peer_id, peer);
1860            }
1861            _ => unreachable!(),
1862        }
1863        match event!(peers) {
1864            PeerAction::Connect { peer_id, remote_addr } => {
1865                assert_eq!(peer_id, peer);
1866                assert_eq!(remote_addr, socket_addr);
1867            }
1868            _ => unreachable!(),
1869        }
1870
1871        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
1872        peers.on_active_outgoing_established(peer);
1873        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
1874
1875        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
1876
1877        {
1878            let p = peers.peers.get(&peer).unwrap();
1879            assert_eq!(p.state, PeerConnectionState::Out);
1880            // not banned yet
1881            assert!(!p.is_banned());
1882        }
1883
1884        // ensure peer is banned eventually
1885        loop {
1886            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
1887
1888            let p = peers.peers.get(&peer).unwrap();
1889            if p.is_banned() {
1890                break
1891            }
1892        }
1893
1894        match event!(peers) {
1895            PeerAction::Disconnect { peer_id, .. } => {
1896                assert_eq!(peer_id, peer);
1897            }
1898            _ => unreachable!(),
1899        }
1900    }
1901
1902    #[tokio::test]
1903    async fn test_reputation_management() {
1904        let peer = PeerId::random();
1905        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1906        let mut peers = PeersManager::default();
1907        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1908        assert_eq!(peers.get_reputation(&peer), Some(0));
1909
1910        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
1911        assert_eq!(peers.get_reputation(&peer), Some(1024));
1912
1913        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
1914        assert_eq!(peers.get_reputation(&peer), Some(0));
1915    }
1916
1917    #[tokio::test]
1918    async fn test_remove_discovered_active() {
1919        let peer = PeerId::random();
1920        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1921        let mut peers = PeersManager::default();
1922        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1923
1924        match event!(peers) {
1925            PeerAction::PeerAdded(peer_id) => {
1926                assert_eq!(peer_id, peer);
1927            }
1928            _ => unreachable!(),
1929        }
1930        match event!(peers) {
1931            PeerAction::Connect { peer_id, remote_addr } => {
1932                assert_eq!(peer_id, peer);
1933                assert_eq!(remote_addr, socket_addr);
1934            }
1935            _ => unreachable!(),
1936        }
1937
1938        let p = peers.peers.get(&peer).unwrap();
1939        assert_eq!(p.state, PeerConnectionState::PendingOut);
1940
1941        peers.remove_peer(peer);
1942
1943        match event!(peers) {
1944            PeerAction::PeerRemoved(peer_id) => {
1945                assert_eq!(peer_id, peer);
1946            }
1947            _ => unreachable!(),
1948        }
1949        match event!(peers) {
1950            PeerAction::Disconnect { peer_id, .. } => {
1951                assert_eq!(peer_id, peer);
1952            }
1953            _ => unreachable!(),
1954        }
1955
1956        let p = peers.peers.get(&peer).unwrap();
1957        assert_eq!(p.state, PeerConnectionState::PendingOut);
1958
1959        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1960        let p = peers.peers.get(&peer).unwrap();
1961        assert_eq!(p.state, PeerConnectionState::PendingOut);
1962
1963        peers.on_active_session_gracefully_closed(peer);
1964        assert!(!peers.peers.contains_key(&peer));
1965    }
1966
1967    #[tokio::test]
1968    async fn test_fatal_outgoing_connection_error_trusted() {
1969        let peer = PeerId::random();
1970        let config = PeersConfig::test()
1971            .with_trusted_nodes(vec![TrustedPeer {
1972                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
1973                tcp_port: 8008,
1974                udp_port: 8008,
1975                id: peer,
1976            }])
1977            .with_trusted_nodes_only(true);
1978        let mut peers = PeersManager::new(config);
1979        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
1980
1981        match event!(peers) {
1982            PeerAction::Connect { peer_id, remote_addr } => {
1983                assert_eq!(peer_id, peer);
1984                assert_eq!(remote_addr, socket_addr);
1985            }
1986            _ => unreachable!(),
1987        }
1988
1989        let p = peers.peers.get(&peer).unwrap();
1990        assert_eq!(p.state, PeerConnectionState::PendingOut);
1991
1992        assert_eq!(peers.num_outbound_connections(), 0);
1993
1994        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1995            EthHandshakeError::NonStatusMessageInHandshake,
1996        ));
1997        assert!(err.is_fatal_protocol_error());
1998
1999        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2000        assert_eq!(peers.num_outbound_connections(), 0);
2001
2002        // try tmp ban peer
2003        match event!(peers) {
2004            PeerAction::BanPeer { peer_id } => {
2005                assert_eq!(peer_id, peer);
2006            }
2007            err => unreachable!("{err:?}"),
2008        }
2009
2010        // ensure we still have trusted peer
2011        assert!(peers.peers.contains_key(&peer));
2012
2013        // await for the ban to expire
2014        tokio::time::sleep(peers.backoff_durations.medium).await;
2015
2016        match event!(peers) {
2017            PeerAction::Connect { peer_id, remote_addr } => {
2018                assert_eq!(peer_id, peer);
2019                assert_eq!(remote_addr, socket_addr);
2020            }
2021            err => unreachable!("{err:?}"),
2022        }
2023    }
2024
2025    #[tokio::test]
2026    async fn test_outgoing_connection_error() {
2027        let peer = PeerId::random();
2028        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2029        let mut peers = PeersManager::default();
2030        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2031
2032        match event!(peers) {
2033            PeerAction::PeerAdded(peer_id) => {
2034                assert_eq!(peer_id, peer);
2035            }
2036            _ => unreachable!(),
2037        }
2038        match event!(peers) {
2039            PeerAction::Connect { peer_id, remote_addr } => {
2040                assert_eq!(peer_id, peer);
2041                assert_eq!(remote_addr, socket_addr);
2042            }
2043            _ => unreachable!(),
2044        }
2045
2046        let p = peers.peers.get(&peer).unwrap();
2047        assert_eq!(p.state, PeerConnectionState::PendingOut);
2048
2049        assert_eq!(peers.num_outbound_connections(), 0);
2050
2051        peers.on_outgoing_connection_failure(
2052            &socket_addr,
2053            &peer,
2054            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2055        );
2056
2057        assert_eq!(peers.num_outbound_connections(), 0);
2058    }
2059
2060    #[tokio::test]
2061    async fn test_outgoing_connection_gracefully_closed() {
2062        let peer = PeerId::random();
2063        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2064        let mut peers = PeersManager::default();
2065        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2066
2067        match event!(peers) {
2068            PeerAction::PeerAdded(peer_id) => {
2069                assert_eq!(peer_id, peer);
2070            }
2071            _ => unreachable!(),
2072        }
2073        match event!(peers) {
2074            PeerAction::Connect { peer_id, remote_addr } => {
2075                assert_eq!(peer_id, peer);
2076                assert_eq!(remote_addr, socket_addr);
2077            }
2078            _ => unreachable!(),
2079        }
2080
2081        let p = peers.peers.get(&peer).unwrap();
2082        assert_eq!(p.state, PeerConnectionState::PendingOut);
2083
2084        assert_eq!(peers.num_outbound_connections(), 0);
2085
2086        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2087
2088        assert_eq!(peers.num_outbound_connections(), 0);
2089        assert_eq!(peers.connection_info.num_pending_out, 0);
2090    }
2091
2092    #[tokio::test]
2093    async fn test_discovery_ban_list() {
2094        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2095        let socket_addr = SocketAddr::new(ip, 8008);
2096        let ban_list = BanList::new(vec![], vec![ip]);
2097        let config = PeersConfig::default().with_ban_list(ban_list);
2098        let mut peer_manager = PeersManager::new(config);
2099        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2100
2101        assert!(peer_manager.peers.is_empty());
2102    }
2103
2104    #[tokio::test]
2105    async fn test_on_pending_ban_list() {
2106        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2107        let socket_addr = SocketAddr::new(ip, 8008);
2108        let ban_list = BanList::new(vec![], vec![ip]);
2109        let config = PeersConfig::test().with_ban_list(ban_list);
2110        let mut peer_manager = PeersManager::new(config);
2111        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2112        // because we have no active peers this should be fine for testings
2113        match a {
2114            Ok(_) => panic!(),
2115            Err(err) => match err {
2116                InboundConnectionError::IpBanned {} => {
2117                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2118                }
2119                _ => unreachable!(),
2120            },
2121        }
2122    }
2123
2124    #[tokio::test]
2125    async fn test_on_active_inbound_ban_list() {
2126        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2127        let socket_addr = SocketAddr::new(ip, 8008);
2128        let given_peer_id = PeerId::random();
2129        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2130        let config = PeersConfig::test().with_ban_list(ban_list);
2131        let mut peer_manager = PeersManager::new(config);
2132        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2133        // non-trusted nodes should also increase pending_in
2134        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2135        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2136        // after the connection is established, the peer should be removed, the num_pending_in
2137        // should be decreased, and the num_inbound should not be increased
2138        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2139        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2140
2141        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2142            peer_manager.queued_actions.pop_front()
2143        else {
2144            panic!()
2145        };
2146
2147        assert_eq!(peer_id, given_peer_id)
2148    }
2149
2150    #[test]
2151    fn test_connection_limits() {
2152        let mut info = ConnectionInfo::default();
2153        info.inc_in();
2154        assert_eq!(info.num_inbound, 1);
2155        assert_eq!(info.num_outbound, 0);
2156        assert!(info.has_in_capacity());
2157
2158        info.decr_in();
2159        assert_eq!(info.num_inbound, 0);
2160        assert_eq!(info.num_outbound, 0);
2161
2162        info.inc_out();
2163        assert_eq!(info.num_inbound, 0);
2164        assert_eq!(info.num_outbound, 1);
2165        assert!(info.has_out_capacity());
2166
2167        info.decr_out();
2168        assert_eq!(info.num_inbound, 0);
2169        assert_eq!(info.num_outbound, 0);
2170    }
2171
2172    #[test]
2173    fn test_connection_peer_state() {
2174        let mut info = ConnectionInfo::default();
2175        info.inc_in();
2176
2177        info.decr_state(PeerConnectionState::In);
2178        assert_eq!(info.num_inbound, 0);
2179        assert_eq!(info.num_outbound, 0);
2180
2181        info.inc_out();
2182
2183        info.decr_state(PeerConnectionState::Out);
2184        assert_eq!(info.num_inbound, 0);
2185        assert_eq!(info.num_outbound, 0);
2186    }
2187
2188    #[tokio::test]
2189    async fn test_trusted_peers_are_prioritized() {
2190        let trusted_peer = PeerId::random();
2191        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2192        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2193            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2194            tcp_port: 8008,
2195            udp_port: 8008,
2196            id: trusted_peer,
2197        }]);
2198        let mut peers = PeersManager::new(config);
2199
2200        let basic_peer = PeerId::random();
2201        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2202        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2203
2204        match event!(peers) {
2205            PeerAction::PeerAdded(peer_id) => {
2206                assert_eq!(peer_id, basic_peer);
2207            }
2208            _ => unreachable!(),
2209        }
2210        match event!(peers) {
2211            PeerAction::Connect { peer_id, remote_addr } => {
2212                assert_eq!(peer_id, trusted_peer);
2213                assert_eq!(remote_addr, trusted_sock);
2214            }
2215            _ => unreachable!(),
2216        }
2217        match event!(peers) {
2218            PeerAction::Connect { peer_id, remote_addr } => {
2219                assert_eq!(peer_id, basic_peer);
2220                assert_eq!(remote_addr, basic_sock);
2221            }
2222            _ => unreachable!(),
2223        }
2224    }
2225
2226    #[tokio::test]
2227    async fn test_connect_trusted_nodes_only() {
2228        let trusted_peer = PeerId::random();
2229        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2230        let config = PeersConfig::test()
2231            .with_trusted_nodes(vec![TrustedPeer {
2232                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2233                tcp_port: 8008,
2234                udp_port: 8008,
2235                id: trusted_peer,
2236            }])
2237            .with_trusted_nodes_only(true);
2238        let mut peers = PeersManager::new(config);
2239
2240        let basic_peer = PeerId::random();
2241        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2242        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2243
2244        match event!(peers) {
2245            PeerAction::PeerAdded(peer_id) => {
2246                assert_eq!(peer_id, basic_peer);
2247            }
2248            _ => unreachable!(),
2249        }
2250        match event!(peers) {
2251            PeerAction::Connect { peer_id, remote_addr } => {
2252                assert_eq!(peer_id, trusted_peer);
2253                assert_eq!(remote_addr, trusted_sock);
2254            }
2255            _ => unreachable!(),
2256        }
2257        poll_fn(|cx| {
2258            assert!(peers.poll(cx).is_pending());
2259            Poll::Ready(())
2260        })
2261        .await;
2262    }
2263
2264    #[tokio::test]
2265    async fn test_incoming_with_trusted_nodes_only() {
2266        let trusted_peer = PeerId::random();
2267        let config = PeersConfig::test()
2268            .with_trusted_nodes(vec![TrustedPeer {
2269                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2270                tcp_port: 8008,
2271                udp_port: 8008,
2272                id: trusted_peer,
2273            }])
2274            .with_trusted_nodes_only(true);
2275        let mut peers = PeersManager::new(config);
2276
2277        let basic_peer = PeerId::random();
2278        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2279        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2280        // non-trusted nodes should also increase pending_in
2281        assert_eq!(peers.connection_info.num_pending_in, 1);
2282        peers.on_incoming_session_established(basic_peer, basic_sock);
2283        // after the connection is established, the peer should be removed, the num_pending_in
2284        // should be decreased, and the num_inbound mut not be increased
2285        assert_eq!(peers.connection_info.num_pending_in, 0);
2286        assert_eq!(peers.connection_info.num_inbound, 0);
2287
2288        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2289            peers.queued_actions.pop_front()
2290        else {
2291            panic!()
2292        };
2293        assert_eq!(basic_peer, peer_id);
2294        assert!(!peers.peers.contains_key(&basic_peer));
2295    }
2296
2297    #[tokio::test]
2298    async fn test_incoming_without_trusted_nodes_only() {
2299        let trusted_peer = PeerId::random();
2300        let config = PeersConfig::test()
2301            .with_trusted_nodes(vec![TrustedPeer {
2302                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2303                tcp_port: 8008,
2304                udp_port: 8008,
2305                id: trusted_peer,
2306            }])
2307            .with_trusted_nodes_only(false);
2308        let mut peers = PeersManager::new(config);
2309
2310        let basic_peer = PeerId::random();
2311        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2312        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2313
2314        // non-trusted nodes should also increase pending_in
2315        assert_eq!(peers.connection_info.num_pending_in, 1);
2316        peers.on_incoming_session_established(basic_peer, basic_sock);
2317        // after the connection is established, the peer should be removed, the num_pending_in
2318        // should be decreased, and the num_inbound must be increased
2319        assert_eq!(peers.connection_info.num_pending_in, 0);
2320        assert_eq!(peers.connection_info.num_inbound, 1);
2321        assert!(peers.peers.contains_key(&basic_peer));
2322    }
2323
2324    #[tokio::test]
2325    async fn test_incoming_at_capacity() {
2326        let mut config = PeersConfig::test();
2327        config.connection_info.max_inbound = 1;
2328        let mut peers = PeersManager::new(config);
2329
2330        let peer = PeerId::random();
2331        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2332        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2333
2334        peers.on_incoming_session_established(peer, addr);
2335
2336        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2337        assert_eq!(
2338            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2339            InboundConnectionError::ExceedsCapacity
2340        );
2341    }
2342
2343    #[tokio::test]
2344    async fn test_incoming_rate_limit() {
2345        let config = PeersConfig {
2346            incoming_ip_throttle_duration: Duration::from_millis(100),
2347            ..PeersConfig::test()
2348        };
2349        let mut peers = PeersManager::new(config);
2350
2351        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2352        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2353        assert_eq!(
2354            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2355            InboundConnectionError::IpBanned
2356        );
2357
2358        peers.release_interval.reset_immediately();
2359        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2360
2361        // await unban
2362        poll_fn(|cx| loop {
2363            if peers.poll(cx).is_pending() {
2364                return Poll::Ready(());
2365            }
2366        })
2367        .await;
2368
2369        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2370        assert_eq!(
2371            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2372            InboundConnectionError::IpBanned
2373        );
2374    }
2375
2376    #[tokio::test]
2377    async fn test_tick() {
2378        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2379        let socket_addr = SocketAddr::new(ip, 8008);
2380        let config = PeersConfig::test();
2381        let mut peer_manager = PeersManager::new(config);
2382        let peer_id = PeerId::random();
2383        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2384
2385        tokio::time::sleep(Duration::from_secs(1)).await;
2386        peer_manager.tick();
2387
2388        // still unconnected
2389        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2390
2391        // mark as connected
2392        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2393
2394        tokio::time::sleep(Duration::from_secs(1)).await;
2395        peer_manager.tick();
2396
2397        // still at default reputation
2398        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2399
2400        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2401
2402        tokio::time::sleep(Duration::from_secs(1)).await;
2403        peer_manager.tick();
2404
2405        // tick applied
2406        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2407    }
2408
2409    #[tokio::test]
2410    async fn test_remove_incoming_after_disconnect() {
2411        let peer_id = PeerId::random();
2412        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2413        let mut peers = PeersManager::default();
2414
2415        peers.on_incoming_pending_session(addr.ip()).unwrap();
2416        peers.on_incoming_session_established(peer_id, addr);
2417        let peer = peers.peers.get(&peer_id).unwrap();
2418        assert_eq!(peer.state, PeerConnectionState::In);
2419        assert!(peer.remove_after_disconnect);
2420
2421        peers.on_active_session_gracefully_closed(peer_id);
2422        assert!(!peers.peers.contains_key(&peer_id))
2423    }
2424
2425    #[tokio::test]
2426    async fn test_keep_incoming_after_disconnect_if_discovered() {
2427        let peer_id = PeerId::random();
2428        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2429        let mut peers = PeersManager::default();
2430
2431        peers.on_incoming_pending_session(addr.ip()).unwrap();
2432        peers.on_incoming_session_established(peer_id, addr);
2433        let peer = peers.peers.get(&peer_id).unwrap();
2434        assert_eq!(peer.state, PeerConnectionState::In);
2435        assert!(peer.remove_after_disconnect);
2436
2437        // trigger discovery manually while the peer is still connected
2438        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2439
2440        peers.on_active_session_gracefully_closed(peer_id);
2441
2442        let peer = peers.peers.get(&peer_id).unwrap();
2443        assert_eq!(peer.state, PeerConnectionState::Idle);
2444        assert!(!peer.remove_after_disconnect);
2445    }
2446
2447    #[tokio::test]
2448    async fn test_incoming_outgoing_already_connected() {
2449        let peer_id = PeerId::random();
2450        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2451        let mut peers = PeersManager::default();
2452
2453        peers.on_incoming_pending_session(addr.ip()).unwrap();
2454        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2455
2456        match event!(peers) {
2457            PeerAction::PeerAdded(_) => {}
2458            _ => unreachable!(),
2459        }
2460
2461        match event!(peers) {
2462            PeerAction::Connect { .. } => {}
2463            _ => unreachable!(),
2464        }
2465
2466        peers.on_incoming_session_established(peer_id, addr);
2467        peers.on_already_connected(Direction::Outgoing(peer_id));
2468        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2469        assert_eq!(peers.connection_info.num_inbound, 1);
2470        assert_eq!(peers.connection_info.num_pending_out, 0);
2471        assert_eq!(peers.connection_info.num_pending_in, 0);
2472        assert_eq!(peers.connection_info.num_outbound, 0);
2473    }
2474
2475    #[tokio::test]
2476    async fn test_already_connected_incoming_outgoing_connection_error() {
2477        let peer_id = PeerId::random();
2478        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2479        let mut peers = PeersManager::default();
2480
2481        peers.on_incoming_pending_session(addr.ip()).unwrap();
2482        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2483
2484        match event!(peers) {
2485            PeerAction::PeerAdded(_) => {}
2486            _ => unreachable!(),
2487        }
2488
2489        match event!(peers) {
2490            PeerAction::Connect { .. } => {}
2491            _ => unreachable!(),
2492        }
2493
2494        peers.on_incoming_session_established(peer_id, addr);
2495
2496        peers.on_outgoing_connection_failure(
2497            &addr,
2498            &peer_id,
2499            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2500        );
2501        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2502        assert_eq!(peers.connection_info.num_inbound, 1);
2503        assert_eq!(peers.connection_info.num_pending_out, 0);
2504        assert_eq!(peers.connection_info.num_pending_in, 0);
2505        assert_eq!(peers.connection_info.num_outbound, 0);
2506    }
2507
2508    #[tokio::test]
2509    async fn test_max_concurrent_dials() {
2510        let config = PeersConfig::default();
2511        let mut peer_manager = PeersManager::new(config);
2512        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2513        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2514        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2515            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2516        }
2517
2518        peer_manager.fill_outbound_slots();
2519        let dials = peer_manager
2520            .queued_actions
2521            .iter()
2522            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2523            .count();
2524        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2525    }
2526
2527    #[tokio::test]
2528    async fn test_max_num_of_pending_dials() {
2529        let config = PeersConfig::default();
2530        let mut peer_manager = PeersManager::new(config);
2531        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2532        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2533
2534        // add more peers than allowed
2535        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2536            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2537        }
2538
2539        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2540            match event!(peer_manager) {
2541                PeerAction::PeerAdded(_) => {}
2542                _ => unreachable!(),
2543            }
2544        }
2545
2546        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2547            match event!(peer_manager) {
2548                PeerAction::Connect { .. } => {}
2549                _ => unreachable!(),
2550            }
2551        }
2552
2553        // generate 'Connect' actions
2554        peer_manager.fill_outbound_slots();
2555
2556        // all dialed connections should be in 'PendingOut' state
2557        let dials = peer_manager.connection_info.num_pending_out;
2558        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2559
2560        let num_pendingout_states = peer_manager
2561            .peers
2562            .iter()
2563            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2564            .map(|(peer_id, _)| *peer_id)
2565            .collect::<Vec<PeerId>>();
2566        assert_eq!(
2567            num_pendingout_states.len(),
2568            peer_manager.connection_info.config.max_concurrent_outbound_dials
2569        );
2570
2571        // establish dialed connections
2572        for peer_id in &num_pendingout_states {
2573            peer_manager.on_active_outgoing_established(*peer_id);
2574        }
2575
2576        // all dialed connections should now be in 'Out' state
2577        for peer_id in &num_pendingout_states {
2578            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2579        }
2580
2581        // no more pending outbound connections
2582        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2583    }
2584
2585    #[tokio::test]
2586    async fn test_connect() {
2587        let peer = PeerId::random();
2588        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2589        let mut peers = PeersManager::default();
2590        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2591        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2592
2593        match event!(peers) {
2594            PeerAction::Connect { peer_id, remote_addr } => {
2595                assert_eq!(peer_id, peer);
2596                assert_eq!(remote_addr, socket_addr);
2597            }
2598            _ => unreachable!(),
2599        }
2600
2601        let (record, _) = peers.peer_by_id(peer).unwrap();
2602        assert_eq!(record.tcp_addr(), socket_addr);
2603        assert_eq!(record.udp_addr(), socket_addr);
2604
2605        // connect again
2606        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2607
2608        let (record, _) = peers.peer_by_id(peer).unwrap();
2609        assert_eq!(record.tcp_addr(), socket_addr);
2610        assert_eq!(record.udp_addr(), socket_addr);
2611    }
2612}