reth_network/
manager.rs

1//! High level network management.
2//!
3//! The [`NetworkManager`] contains the state of the network as a whole. It controls how connections
4//! are handled and keeps track of connections to peers.
5//!
6//! ## Capabilities
7//!
8//! The network manages peers depending on their announced capabilities via their `RLPx` sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
9//!
10//! ## Overview
11//!
12//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
13//! made up of peer-to-peer connections between nodes that are available on the same network.
14//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
15//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
16//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
17
18use crate::{
19    budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20    config::NetworkConfig,
21    discovery::Discovery,
22    error::{NetworkError, ServiceKind},
23    eth_requests::IncomingEthRequest,
24    import::{BlockImport, BlockImportOutcome, BlockValidation},
25    listener::ConnectionListener,
26    message::{NewBlockMessage, PeerMessage},
27    metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28    network::{NetworkHandle, NetworkHandleMessage},
29    peers::PeersManager,
30    poll_nested_stream_with_budget,
31    protocol::IntoRlpxSubProtocol,
32    session::SessionManager,
33    state::NetworkState,
34    swarm::{Swarm, SwarmEvent},
35    transactions::NetworkTransactionEvent,
36    FetchClient, NetworkBuilder,
37};
38use futures::{Future, StreamExt};
39use parking_lot::Mutex;
40use reth_eth_wire::{
41    capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives,
42    NetworkPrimitives,
43};
44use reth_fs_util::{self as fs, FsPathError};
45use reth_metrics::common::mpsc::UnboundedMeteredSender;
46use reth_network_api::{
47    events::{PeerEvent, SessionInfo},
48    test_utils::PeersHandle,
49    EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
50};
51use reth_network_peers::{NodeRecord, PeerId};
52use reth_network_types::ReputationChangeKind;
53use reth_storage_api::BlockNumReader;
54use reth_tasks::shutdown::GracefulShutdown;
55use reth_tokio_util::EventSender;
56use secp256k1::SecretKey;
57use std::{
58    net::SocketAddr,
59    path::Path,
60    pin::Pin,
61    sync::{
62        atomic::{AtomicU64, AtomicUsize, Ordering},
63        Arc,
64    },
65    task::{Context, Poll},
66    time::{Duration, Instant},
67};
68use tokio::sync::mpsc::{self, error::TrySendError};
69use tokio_stream::wrappers::UnboundedReceiverStream;
70use tracing::{debug, error, trace, warn};
71
72#[cfg_attr(doc, aquamarine::aquamarine)]
73// TODO: Inlined diagram due to a bug in aquamarine library, should become an include when it's
74// fixed. See https://github.com/mersinvald/aquamarine/issues/50
75// include_mmd!("docs/mermaid/network-manager.mmd")
76/// Manages the _entire_ state of the network.
77///
78/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
79///
80/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
81///
82/// ```mermaid
83/// graph TB
84///   handle(NetworkHandle)
85///   events(NetworkEvents)
86///   transactions(Transactions Task)
87///   ethrequest(ETH Request Task)
88///   discovery(Discovery Task)
89///   subgraph NetworkManager
90///     direction LR
91///     subgraph Swarm
92///         direction TB
93///         B1[(Session Manager)]
94///         B2[(Connection Lister)]
95///         B3[(Network State)]
96///     end
97///  end
98///  handle <--> |request response channel| NetworkManager
99///  NetworkManager --> |Network events| events
100///  transactions <--> |transactions| NetworkManager
101///  ethrequest <--> |ETH request handing| NetworkManager
102///  discovery --> |Discovered peers| NetworkManager
103/// ```
104#[derive(Debug)]
105#[must_use = "The NetworkManager does nothing unless polled"]
106pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
107    /// The type that manages the actual network part, which includes connections.
108    swarm: Swarm<N>,
109    /// Underlying network handle that can be shared.
110    handle: NetworkHandle<N>,
111    /// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
112    from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
113    /// Handles block imports according to the `eth` protocol.
114    block_import: Box<dyn BlockImport<N::Block>>,
115    /// Sender for high level network events.
116    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
117    /// Sender half to send events to the
118    /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
119    to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
120    /// Sender half to send events to the
121    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
122    ///
123    /// The channel that originally receives and bundles all requests from all sessions is already
124    /// bounded. However, since handling an eth request is more I/O intensive than delegating
125    /// them from the bounded channel to the eth-request channel, it is possible that this
126    /// builds up if the node is flooded with requests.
127    ///
128    /// Even though nonmalicious requests are relatively cheap, it's possible to craft
129    /// body requests with bogus data up until the allowed max message size limit.
130    /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
131    /// requests. This channel size is set at
132    /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
133    to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
134    /// Tracks the number of active session (connected peers).
135    ///
136    /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
137    /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
138    num_active_peers: Arc<AtomicUsize>,
139    /// Metrics for the Network
140    metrics: NetworkMetrics,
141    /// Disconnect metrics for the Network
142    disconnect_metrics: DisconnectMetrics,
143}
144
145// === impl NetworkManager ===
146impl<N: NetworkPrimitives> NetworkManager<N> {
147    /// Sets the dedicated channel for events indented for the
148    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
149    pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
150        self.to_transactions_manager =
151            Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
152    }
153
154    /// Sets the dedicated channel for events indented for the
155    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
156    pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
157        self.to_eth_request_handler = Some(tx);
158    }
159
160    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
161    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
162        self.swarm.add_rlpx_sub_protocol(protocol)
163    }
164
165    /// Returns the [`NetworkHandle`] that can be cloned and shared.
166    ///
167    /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
168    pub const fn handle(&self) -> &NetworkHandle<N> {
169        &self.handle
170    }
171
172    /// Returns the secret key used for authenticating sessions.
173    pub const fn secret_key(&self) -> SecretKey {
174        self.swarm.sessions().secret_key()
175    }
176
177    #[inline]
178    fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
179        let metrics = &self.metrics;
180
181        let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
182
183        // update metrics for whole poll function
184        metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
185        // update poll metrics for nested items
186        metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
187        metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
188    }
189
190    /// Creates the manager of a new network.
191    ///
192    /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
193    /// state of the entire network.
194    pub async fn new<C: BlockNumReader + 'static>(
195        config: NetworkConfig<C, N>,
196    ) -> Result<Self, NetworkError> {
197        let NetworkConfig {
198            client,
199            secret_key,
200            discovery_v4_addr,
201            mut discovery_v4_config,
202            mut discovery_v5_config,
203            listener_addr,
204            peers_config,
205            sessions_config,
206            chain_id,
207            block_import,
208            network_mode,
209            boot_nodes,
210            executor,
211            hello_message,
212            status,
213            fork_filter,
214            dns_discovery_config,
215            extra_protocols,
216            tx_gossip_disabled,
217            transactions_manager_config: _,
218            nat,
219        } = config;
220
221        let peers_manager = PeersManager::new(peers_config);
222        let peers_handle = peers_manager.handle();
223
224        let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
225            NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
226        })?;
227
228        // retrieve the tcp address of the socket
229        let listener_addr = incoming.local_address();
230
231        // resolve boot nodes
232        let resolved_boot_nodes =
233            futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
234
235        if let Some(disc_config) = discovery_v4_config.as_mut() {
236            // merge configured boot nodes
237            disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
238            disc_config.add_eip868_pair("eth", status.forkid);
239        }
240
241        if let Some(discv5) = discovery_v5_config.as_mut() {
242            // merge configured boot nodes
243            discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
244        }
245
246        let discovery = Discovery::new(
247            listener_addr,
248            discovery_v4_addr,
249            secret_key,
250            discovery_v4_config,
251            discovery_v5_config,
252            dns_discovery_config,
253        )
254        .await?;
255        // need to retrieve the addr here since provided port could be `0`
256        let local_peer_id = discovery.local_id();
257        let discv4 = discovery.discv4();
258        let discv5 = discovery.discv5();
259
260        let num_active_peers = Arc::new(AtomicUsize::new(0));
261
262        let sessions = SessionManager::new(
263            secret_key,
264            sessions_config,
265            executor,
266            status,
267            hello_message,
268            fork_filter,
269            extra_protocols,
270        );
271
272        let state = NetworkState::new(
273            crate::state::BlockNumReader::new(client),
274            discovery,
275            peers_manager,
276            Arc::clone(&num_active_peers),
277        );
278
279        let swarm = Swarm::new(incoming, sessions, state);
280
281        let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
282
283        let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
284
285        let handle = NetworkHandle::new(
286            Arc::clone(&num_active_peers),
287            Arc::new(Mutex::new(listener_addr)),
288            to_manager_tx,
289            secret_key,
290            local_peer_id,
291            peers_handle,
292            network_mode,
293            Arc::new(AtomicU64::new(chain_id)),
294            tx_gossip_disabled,
295            discv4,
296            discv5,
297            event_sender.clone(),
298            nat,
299        );
300
301        Ok(Self {
302            swarm,
303            handle,
304            from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
305            block_import,
306            event_sender,
307            to_transactions_manager: None,
308            to_eth_request_handler: None,
309            num_active_peers,
310            metrics: Default::default(),
311            disconnect_metrics: Default::default(),
312        })
313    }
314
315    /// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
316    /// components of the network
317    ///
318    /// ```
319    /// use reth_network::{
320    ///     config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
321    /// };
322    /// use reth_network_peers::mainnet_nodes;
323    /// use reth_storage_api::noop::NoopProvider;
324    /// use reth_transaction_pool::TransactionPool;
325    /// async fn launch<Pool: TransactionPool>(pool: Pool) {
326    ///     // This block provider implementation is used for testing purposes.
327    ///     let client = NoopProvider::default();
328    ///
329    ///     // The key that's used for encrypting sessions and to identify our node.
330    ///     let local_key = rng_secret_key();
331    ///
332    ///     let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
333    ///         .boot_nodes(mainnet_nodes())
334    ///         .build(client.clone());
335    ///     let transactions_manager_config = config.transactions_manager_config.clone();
336    ///
337    ///     // create the network instance
338    ///     let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
339    ///         .await
340    ///         .unwrap()
341    ///         .transactions(pool, transactions_manager_config)
342    ///         .request_handler(client)
343    ///         .split_with_handle();
344    /// }
345    /// ```
346    pub async fn builder<C: BlockNumReader + 'static>(
347        config: NetworkConfig<C, N>,
348    ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
349        let network = Self::new(config).await?;
350        Ok(network.into_builder())
351    }
352
353    /// Create a [`NetworkBuilder`] to configure all components of the network
354    pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
355        NetworkBuilder { network: self, transactions: (), request_handler: () }
356    }
357
358    /// Returns the [`SocketAddr`] that listens for incoming tcp connections.
359    pub const fn local_addr(&self) -> SocketAddr {
360        self.swarm.listener().local_address()
361    }
362
363    /// How many peers we're currently connected to.
364    pub fn num_connected_peers(&self) -> usize {
365        self.swarm.state().num_active_peers()
366    }
367
368    /// Returns the [`PeerId`] used in the network.
369    pub fn peer_id(&self) -> &PeerId {
370        self.handle.peer_id()
371    }
372
373    /// Returns an iterator over all peers in the peer set.
374    pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
375        self.swarm.state().peers().iter_peers()
376    }
377
378    /// Returns the number of peers in the peer set.
379    pub fn num_known_peers(&self) -> usize {
380        self.swarm.state().peers().num_known_peers()
381    }
382
383    /// Returns a new [`PeersHandle`] that can be cloned and shared.
384    ///
385    /// The [`PeersHandle`] can be used to interact with the network's peer set.
386    pub fn peers_handle(&self) -> PeersHandle {
387        self.swarm.state().peers().handle()
388    }
389
390    /// Collect the peers from the [`NetworkManager`] and write them to the given
391    /// `persistent_peers_file`.
392    pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
393        let known_peers = self.all_peers().collect::<Vec<_>>();
394        persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
395        reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
396        Ok(())
397    }
398
399    /// Returns a new [`FetchClient`] that can be cloned and shared.
400    ///
401    /// The [`FetchClient`] is the entrypoint for sending requests to the network.
402    pub fn fetch_client(&self) -> FetchClient<N> {
403        self.swarm.state().fetch_client()
404    }
405
406    /// Returns the current [`NetworkStatus`] for the local node.
407    pub fn status(&self) -> NetworkStatus {
408        let sessions = self.swarm.sessions();
409        let status = sessions.status();
410        let hello_message = sessions.hello_message();
411
412        NetworkStatus {
413            client_version: hello_message.client_version,
414            protocol_version: hello_message.protocol_version as u64,
415            eth_protocol_info: EthProtocolInfo {
416                difficulty: status.total_difficulty,
417                head: status.blockhash,
418                network: status.chain.id(),
419                genesis: status.genesis,
420                config: Default::default(),
421            },
422        }
423    }
424
425    /// Event hook for an unexpected message from the peer.
426    fn on_invalid_message(
427        &mut self,
428        peer_id: PeerId,
429        _capabilities: Arc<Capabilities>,
430        _message: CapabilityMessage<N>,
431    ) {
432        trace!(target: "net", ?peer_id, "received unexpected message");
433        self.swarm
434            .state_mut()
435            .peers_mut()
436            .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
437    }
438
439    /// Sends an event to the [`TransactionsManager`](crate::transactions::TransactionsManager) if
440    /// configured.
441    fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
442        if let Some(ref tx) = self.to_transactions_manager {
443            let _ = tx.send(event);
444        }
445    }
446
447    /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
448    /// configured.
449    fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
450        if let Some(ref reqs) = self.to_eth_request_handler {
451            let _ = reqs.try_send(event).map_err(|e| {
452                if let TrySendError::Full(_) = e {
453                    debug!(target:"net", "EthRequestHandler channel is full!");
454                    self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
455                }
456            });
457        }
458    }
459
460    /// Handle an incoming request from the peer
461    fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
462        match req {
463            PeerRequest::GetBlockHeaders { request, response } => {
464                self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
465                    peer_id,
466                    request,
467                    response,
468                })
469            }
470            PeerRequest::GetBlockBodies { request, response } => {
471                self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
472                    peer_id,
473                    request,
474                    response,
475                })
476            }
477            PeerRequest::GetNodeData { request, response } => {
478                self.delegate_eth_request(IncomingEthRequest::GetNodeData {
479                    peer_id,
480                    request,
481                    response,
482                })
483            }
484            PeerRequest::GetReceipts { request, response } => {
485                self.delegate_eth_request(IncomingEthRequest::GetReceipts {
486                    peer_id,
487                    request,
488                    response,
489                })
490            }
491            PeerRequest::GetPooledTransactions { request, response } => {
492                self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
493                    peer_id,
494                    request,
495                    response,
496                });
497            }
498        }
499    }
500
501    /// Invoked after a `NewBlock` message from the peer was validated
502    fn on_block_import_result(&mut self, outcome: BlockImportOutcome<N::Block>) {
503        let BlockImportOutcome { peer, result } = outcome;
504        match result {
505            Ok(validated_block) => match validated_block {
506                BlockValidation::ValidHeader { block } => {
507                    self.swarm.state_mut().update_peer_block(&peer, block.hash, block.number());
508                    self.swarm.state_mut().announce_new_block(block);
509                }
510                BlockValidation::ValidBlock { block } => {
511                    self.swarm.state_mut().announce_new_block_hash(block);
512                }
513            },
514            Err(_err) => {
515                self.swarm
516                    .state_mut()
517                    .peers_mut()
518                    .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
519            }
520        }
521    }
522
523    /// Enforces [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) consensus rules for the network protocol
524    ///
525    /// Depending on the mode of the network:
526    ///    - disconnect peer if in POS
527    ///    - execute the closure if in POW
528    fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
529    where
530        F: FnOnce(&mut Self),
531    {
532        // reject message in POS
533        if self.handle.mode().is_stake() {
534            // connections to peers which send invalid messages should be terminated
535            self.swarm
536                .sessions_mut()
537                .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
538        } else {
539            only_pow(self);
540        }
541    }
542
543    /// Handles a received Message from the peer's session.
544    fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
545        match msg {
546            PeerMessage::NewBlockHashes(hashes) => {
547                self.within_pow_or_disconnect(peer_id, |this| {
548                    // update peer's state, to track what blocks this peer has seen
549                    this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
550                })
551            }
552            PeerMessage::NewBlock(block) => {
553                self.within_pow_or_disconnect(peer_id, move |this| {
554                    this.swarm.state_mut().on_new_block(peer_id, block.hash);
555                    // start block import process
556                    this.block_import.on_new_block(peer_id, block);
557                });
558            }
559            PeerMessage::PooledTransactions(msg) => {
560                self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
561                    peer_id,
562                    msg,
563                });
564            }
565            PeerMessage::EthRequest(req) => {
566                self.on_eth_request(peer_id, req);
567            }
568            PeerMessage::ReceivedTransaction(msg) => {
569                self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
570                    peer_id,
571                    msg,
572                });
573            }
574            PeerMessage::SendTransactions(_) => {
575                unreachable!("Not emitted by session")
576            }
577            PeerMessage::Other(other) => {
578                debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
579            }
580        }
581    }
582
583    /// Handler for received messages from a handle
584    fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
585        match msg {
586            NetworkHandleMessage::DiscoveryListener(tx) => {
587                self.swarm.state_mut().discovery_mut().add_listener(tx);
588            }
589            NetworkHandleMessage::AnnounceBlock(block, hash) => {
590                if self.handle.mode().is_stake() {
591                    // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
592                    warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
593                    return
594                }
595                let msg = NewBlockMessage { hash, block: Arc::new(block) };
596                self.swarm.state_mut().announce_new_block(msg);
597            }
598            NetworkHandleMessage::EthRequest { peer_id, request } => {
599                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
600            }
601            NetworkHandleMessage::SendTransaction { peer_id, msg } => {
602                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
603            }
604            NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
605                .swarm
606                .sessions_mut()
607                .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
608            NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
609                self.swarm.state_mut().add_trusted_peer_id(peer_id);
610            }
611            NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
612                // only add peer if we are not shutting down
613                if !self.swarm.is_shutting_down() {
614                    self.swarm.state_mut().add_peer_kind(peer, kind, addr);
615                }
616            }
617            NetworkHandleMessage::RemovePeer(peer_id, kind) => {
618                self.swarm.state_mut().remove_peer_kind(peer_id, kind);
619            }
620            NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
621                self.swarm.sessions_mut().disconnect(peer_id, reason);
622            }
623            NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
624                self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
625            }
626            NetworkHandleMessage::SetNetworkState(net_state) => {
627                // Sets network connection state between Active and Hibernate.
628                // If hibernate stops the node to fill new outbound
629                // connections, this is beneficial for sync stages that do not require a network
630                // connection.
631                self.swarm.on_network_state_change(net_state);
632            }
633
634            NetworkHandleMessage::Shutdown(tx) => {
635                self.perform_network_shutdown();
636                let _ = tx.send(());
637            }
638            NetworkHandleMessage::ReputationChange(peer_id, kind) => {
639                self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
640            }
641            NetworkHandleMessage::GetReputationById(peer_id, tx) => {
642                let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
643            }
644            NetworkHandleMessage::FetchClient(tx) => {
645                let _ = tx.send(self.fetch_client());
646            }
647            NetworkHandleMessage::GetStatus(tx) => {
648                let _ = tx.send(self.status());
649            }
650            NetworkHandleMessage::StatusUpdate { head } => {
651                if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
652                    self.swarm.state_mut().update_fork_id(transition.current);
653                }
654            }
655            NetworkHandleMessage::GetPeerInfos(tx) => {
656                let _ = tx.send(self.get_peer_infos());
657            }
658            NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
659                let _ = tx.send(self.get_peer_info_by_id(peer_id));
660            }
661            NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
662                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
663            }
664            NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
665                let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
666                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
667            }
668            NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
669            NetworkHandleMessage::GetTransactionsHandle(tx) => {
670                if let Some(ref tx_inner) = self.to_transactions_manager {
671                    let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
672                } else {
673                    let _ = tx.send(None);
674                }
675            }
676            NetworkHandleMessage::EthMessage { peer_id, message } => {
677                self.swarm.sessions_mut().send_message(&peer_id, message)
678            }
679        }
680    }
681
682    fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
683        // handle event
684        match event {
685            SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
686            SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
687                self.on_invalid_message(peer_id, capabilities, message);
688                self.metrics.invalid_messages_received.increment(1);
689            }
690            SwarmEvent::TcpListenerClosed { remote_addr } => {
691                trace!(target: "net", ?remote_addr, "TCP listener closed.");
692            }
693            SwarmEvent::TcpListenerError(err) => {
694                trace!(target: "net", %err, "TCP connection error.");
695            }
696            SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
697                trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
698                self.metrics.total_incoming_connections.increment(1);
699                self.metrics
700                    .incoming_connections
701                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
702            }
703            SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
704                trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
705                self.metrics.total_outgoing_connections.increment(1);
706                self.update_pending_connection_metrics()
707            }
708            SwarmEvent::SessionEstablished {
709                peer_id,
710                remote_addr,
711                client_version,
712                capabilities,
713                version,
714                messages,
715                status,
716                direction,
717            } => {
718                let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
719                self.metrics.connected_peers.set(total_active as f64);
720                debug!(
721                    target: "net",
722                    ?remote_addr,
723                    %client_version,
724                    ?peer_id,
725                    ?total_active,
726                    kind=%direction,
727                    peer_enode=%NodeRecord::new(remote_addr, peer_id),
728                    "Session established"
729                );
730
731                if direction.is_incoming() {
732                    self.swarm
733                        .state_mut()
734                        .peers_mut()
735                        .on_incoming_session_established(peer_id, remote_addr);
736                }
737
738                if direction.is_outgoing() {
739                    self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
740                }
741
742                self.update_active_connection_metrics();
743
744                let session_info = SessionInfo {
745                    peer_id,
746                    remote_addr,
747                    client_version,
748                    capabilities,
749                    status,
750                    version,
751                };
752
753                self.event_sender
754                    .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
755            }
756            SwarmEvent::PeerAdded(peer_id) => {
757                trace!(target: "net", ?peer_id, "Peer added");
758                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
759                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
760            }
761            SwarmEvent::PeerRemoved(peer_id) => {
762                trace!(target: "net", ?peer_id, "Peer dropped");
763                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
764                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
765            }
766            SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
767                let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
768                self.metrics.connected_peers.set(total_active as f64);
769                trace!(
770                    target: "net",
771                    ?remote_addr,
772                    ?peer_id,
773                    ?total_active,
774                    ?error,
775                    "Session disconnected"
776                );
777
778                let mut reason = None;
779                if let Some(ref err) = error {
780                    // If the connection was closed due to an error, we report
781                    // the peer
782                    self.swarm.state_mut().peers_mut().on_active_session_dropped(
783                        &remote_addr,
784                        &peer_id,
785                        err,
786                    );
787                    reason = err.as_disconnected();
788                } else {
789                    // Gracefully disconnected
790                    self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
791                }
792                self.metrics.closed_sessions.increment(1);
793                self.update_active_connection_metrics();
794
795                if let Some(reason) = reason {
796                    self.disconnect_metrics.increment(reason);
797                }
798                self.metrics.backed_off_peers.set(
799                        self.swarm
800                            .state()
801                            .peers()
802                            .num_backed_off_peers()
803                            .saturating_sub(1)
804                            as f64,
805                    );
806                self.event_sender
807                    .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
808            }
809            SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
810                trace!(
811                    target: "net",
812                    ?remote_addr,
813                    ?error,
814                    "Incoming pending session failed"
815                );
816
817                if let Some(ref err) = error {
818                    self.swarm
819                        .state_mut()
820                        .peers_mut()
821                        .on_incoming_pending_session_dropped(remote_addr, err);
822                    self.metrics.pending_session_failures.increment(1);
823                    if let Some(reason) = err.as_disconnected() {
824                        self.disconnect_metrics.increment(reason);
825                    }
826                } else {
827                    self.swarm
828                        .state_mut()
829                        .peers_mut()
830                        .on_incoming_pending_session_gracefully_closed();
831                }
832                self.metrics.closed_sessions.increment(1);
833                self.metrics
834                    .incoming_connections
835                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
836                self.metrics.backed_off_peers.set(
837                        self.swarm
838                            .state()
839                            .peers()
840                            .num_backed_off_peers()
841                            .saturating_sub(1)
842                            as f64,
843                    );
844            }
845            SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
846                trace!(
847                    target: "net",
848                    ?remote_addr,
849                    ?peer_id,
850                    ?error,
851                    "Outgoing pending session failed"
852                );
853
854                if let Some(ref err) = error {
855                    self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
856                        &remote_addr,
857                        &peer_id,
858                        err,
859                    );
860                    self.metrics.pending_session_failures.increment(1);
861                    if let Some(reason) = err.as_disconnected() {
862                        self.disconnect_metrics.increment(reason);
863                    }
864                } else {
865                    self.swarm
866                        .state_mut()
867                        .peers_mut()
868                        .on_outgoing_pending_session_gracefully_closed(&peer_id);
869                }
870                self.metrics.closed_sessions.increment(1);
871                self.update_pending_connection_metrics();
872
873                self.metrics.backed_off_peers.set(
874                        self.swarm
875                            .state()
876                            .peers()
877                            .num_backed_off_peers()
878                            .saturating_sub(1)
879                            as f64,
880                    );
881            }
882            SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
883                trace!(
884                    target: "net",
885                    ?remote_addr,
886                    ?peer_id,
887                    %error,
888                    "Outgoing connection error"
889                );
890
891                self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
892                    &remote_addr,
893                    &peer_id,
894                    &error,
895                );
896
897                self.metrics.backed_off_peers.set(
898                        self.swarm
899                            .state()
900                            .peers()
901                            .num_backed_off_peers()
902                            .saturating_sub(1)
903                            as f64,
904                    );
905                self.update_pending_connection_metrics();
906            }
907            SwarmEvent::BadMessage { peer_id } => {
908                self.swarm
909                    .state_mut()
910                    .peers_mut()
911                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
912                self.metrics.invalid_messages_received.increment(1);
913            }
914            SwarmEvent::ProtocolBreach { peer_id } => {
915                self.swarm
916                    .state_mut()
917                    .peers_mut()
918                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
919            }
920        }
921    }
922
923    /// Returns [`PeerInfo`] for all connected peers
924    fn get_peer_infos(&self) -> Vec<PeerInfo> {
925        self.swarm
926            .sessions()
927            .active_sessions()
928            .iter()
929            .filter_map(|(&peer_id, session)| {
930                self.swarm
931                    .state()
932                    .peers()
933                    .peer_by_id(peer_id)
934                    .map(|(record, kind)| session.peer_info(&record, kind))
935            })
936            .collect()
937    }
938
939    /// Returns [`PeerInfo`] for a given peer.
940    ///
941    /// Returns `None` if there's no active session to the peer.
942    fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
943        self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
944            self.swarm
945                .state()
946                .peers()
947                .peer_by_id(peer_id)
948                .map(|(record, kind)| session.peer_info(&record, kind))
949        })
950    }
951
952    /// Returns [`PeerInfo`] for a given peers.
953    ///
954    /// Ignore the non-active peer.
955    fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
956        peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
957    }
958
959    /// Updates the metrics for active,established connections
960    #[inline]
961    fn update_active_connection_metrics(&self) {
962        self.metrics
963            .incoming_connections
964            .set(self.swarm.state().peers().num_inbound_connections() as f64);
965        self.metrics
966            .outgoing_connections
967            .set(self.swarm.state().peers().num_outbound_connections() as f64);
968    }
969
970    /// Updates the metrics for pending connections
971    #[inline]
972    fn update_pending_connection_metrics(&self) {
973        self.metrics
974            .pending_outgoing_connections
975            .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
976        self.metrics
977            .total_pending_connections
978            .set(self.swarm.sessions().num_pending_connections() as f64);
979    }
980
981    /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
982    ///
983    /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
984    pub async fn run_until_graceful_shutdown<F, R>(
985        mut self,
986        shutdown: GracefulShutdown,
987        shutdown_hook: F,
988    ) -> R
989    where
990        F: FnOnce(Self) -> R,
991    {
992        let mut graceful_guard = None;
993        tokio::select! {
994            _ = &mut self => {},
995            guard = shutdown => {
996                graceful_guard = Some(guard);
997            },
998        }
999
1000        self.perform_network_shutdown();
1001        let res = shutdown_hook(self);
1002        drop(graceful_guard);
1003        res
1004    }
1005
1006    /// Performs a graceful network shutdown by stopping new connections from being accepted while
1007    /// draining current and pending connections.
1008    fn perform_network_shutdown(&mut self) {
1009        // Set connection status to `Shutdown`. Stops node from accepting
1010        // new incoming connections as well as sending connection requests to newly
1011        // discovered nodes.
1012        self.swarm.on_shutdown_requested();
1013        // Disconnect all active connections
1014        self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1015        // drop pending connections
1016        self.swarm.sessions_mut().disconnect_all_pending();
1017    }
1018}
1019
1020impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1021    type Output = ();
1022
1023    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1024        let start = Instant::now();
1025        let mut poll_durations = NetworkManagerPollDurations::default();
1026
1027        let this = self.get_mut();
1028
1029        // poll new block imports (expected to be a noop for POS)
1030        while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1031            this.on_block_import_result(outcome);
1032        }
1033
1034        // These loops drive the entire state of network and does a lot of work. Under heavy load
1035        // (many messages/events), data may arrive faster than it can be processed (incoming
1036        // messages/requests -> events), and it is possible that more data has already arrived by
1037        // the time an internal event is processed. Which could turn this loop into a busy loop.
1038        // Without yielding back to the executor, it can starve other tasks waiting on that
1039        // executor to execute them, or drive underlying resources To prevent this, we
1040        // preemptively return control when the `budget` is exhausted. The value itself is chosen
1041        // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
1042        // low enough that this loop does not starve other tasks for too long. If the budget is
1043        // exhausted we manually yield back control to the (coop) scheduler. This manual yield
1044        // point should prevent situations where polling appears to be frozen. See also
1045        // <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
1046        // <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
1047        //
1048        // Testing has shown that this loop naturally reaches the pending state within 1-5
1049        // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
1050        // range of what's recommended as rule of thumb.
1051        // <https://ryhl.io/blog/async-what-is-blocking/>
1052
1053        // process incoming messages from a handle (`TransactionsManager` has one)
1054        //
1055        // will only be closed if the channel was deliberately closed since we always have an
1056        // instance of `NetworkHandle`
1057        let start_network_handle = Instant::now();
1058        let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1059            "net",
1060            "Network message channel",
1061            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1062            this.from_handle_rx.poll_next_unpin(cx),
1063            |msg| this.on_handle_message(msg),
1064            error!("Network channel closed");
1065        );
1066        poll_durations.acc_network_handle = start_network_handle.elapsed();
1067
1068        // process incoming messages from the network
1069        let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1070            "net",
1071            "Swarm events stream",
1072            DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1073            this.swarm.poll_next_unpin(cx),
1074            |event| this.on_swarm_event(event),
1075        );
1076        poll_durations.acc_swarm =
1077            start_network_handle.elapsed() - poll_durations.acc_network_handle;
1078
1079        // all streams are fully drained and import futures pending
1080        if maybe_more_handle_messages || maybe_more_swarm_events {
1081            // make sure we're woken up again
1082            cx.waker().wake_by_ref();
1083            return Poll::Pending
1084        }
1085
1086        this.update_poll_metrics(start, poll_durations);
1087
1088        Poll::Pending
1089    }
1090}
1091
1092#[derive(Debug, Default)]
1093struct NetworkManagerPollDurations {
1094    acc_network_handle: Duration,
1095    acc_swarm: Duration,
1096}