reth_network/
manager.rs

1//! High level network management.
2//!
3//! The [`NetworkManager`] contains the state of the network as a whole. It controls how connections
4//! are handled and keeps track of connections to peers.
5//!
6//! ## Capabilities
7//!
8//! The network manages peers depending on their announced capabilities via their `RLPx` sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
9//!
10//! ## Overview
11//!
12//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
13//! made up of peer-to-peer connections between nodes that are available on the same network.
14//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
15//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
16//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
17
18use crate::{
19    budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20    config::NetworkConfig,
21    discovery::Discovery,
22    error::{NetworkError, ServiceKind},
23    eth_requests::IncomingEthRequest,
24    import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25    listener::ConnectionListener,
26    message::{NewBlockMessage, PeerMessage},
27    metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28    network::{NetworkHandle, NetworkHandleMessage},
29    peers::PeersManager,
30    poll_nested_stream_with_budget,
31    protocol::IntoRlpxSubProtocol,
32    session::SessionManager,
33    state::NetworkState,
34    swarm::{Swarm, SwarmEvent},
35    transactions::NetworkTransactionEvent,
36    FetchClient, NetworkBuilder,
37};
38use futures::{Future, StreamExt};
39use parking_lot::Mutex;
40use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
41use reth_fs_util::{self as fs, FsPathError};
42use reth_metrics::common::mpsc::UnboundedMeteredSender;
43use reth_network_api::{
44    events::{PeerEvent, SessionInfo},
45    test_utils::PeersHandle,
46    EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
47};
48use reth_network_peers::{NodeRecord, PeerId};
49use reth_network_types::ReputationChangeKind;
50use reth_storage_api::BlockNumReader;
51use reth_tasks::shutdown::GracefulShutdown;
52use reth_tokio_util::EventSender;
53use secp256k1::SecretKey;
54use std::{
55    net::SocketAddr,
56    path::Path,
57    pin::Pin,
58    sync::{
59        atomic::{AtomicU64, AtomicUsize, Ordering},
60        Arc,
61    },
62    task::{Context, Poll},
63    time::{Duration, Instant},
64};
65use tokio::sync::mpsc::{self, error::TrySendError};
66use tokio_stream::wrappers::UnboundedReceiverStream;
67use tracing::{debug, error, trace, warn};
68
69#[cfg_attr(doc, aquamarine::aquamarine)]
70// TODO: Inlined diagram due to a bug in aquamarine library, should become an include when it's
71// fixed. See https://github.com/mersinvald/aquamarine/issues/50
72// include_mmd!("docs/mermaid/network-manager.mmd")
73/// Manages the _entire_ state of the network.
74///
75/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
76///
77/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
78///
79/// ```mermaid
80/// graph TB
81///   handle(NetworkHandle)
82///   events(NetworkEvents)
83///   transactions(Transactions Task)
84///   ethrequest(ETH Request Task)
85///   discovery(Discovery Task)
86///   subgraph NetworkManager
87///     direction LR
88///     subgraph Swarm
89///         direction TB
90///         B1[(Session Manager)]
91///         B2[(Connection Lister)]
92///         B3[(Network State)]
93///     end
94///  end
95///  handle <--> |request response channel| NetworkManager
96///  NetworkManager --> |Network events| events
97///  transactions <--> |transactions| NetworkManager
98///  ethrequest <--> |ETH request handing| NetworkManager
99///  discovery --> |Discovered peers| NetworkManager
100/// ```
101#[derive(Debug)]
102#[must_use = "The NetworkManager does nothing unless polled"]
103pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
104    /// The type that manages the actual network part, which includes connections.
105    swarm: Swarm<N>,
106    /// Underlying network handle that can be shared.
107    handle: NetworkHandle<N>,
108    /// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
109    from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
110    /// Handles block imports according to the `eth` protocol.
111    block_import: Box<dyn BlockImport<N::Block>>,
112    /// Sender for high level network events.
113    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
114    /// Sender half to send events to the
115    /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
116    to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
117    /// Sender half to send events to the
118    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
119    ///
120    /// The channel that originally receives and bundles all requests from all sessions is already
121    /// bounded. However, since handling an eth request is more I/O intensive than delegating
122    /// them from the bounded channel to the eth-request channel, it is possible that this
123    /// builds up if the node is flooded with requests.
124    ///
125    /// Even though nonmalicious requests are relatively cheap, it's possible to craft
126    /// body requests with bogus data up until the allowed max message size limit.
127    /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
128    /// requests. This channel size is set at
129    /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
130    to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
131    /// Tracks the number of active session (connected peers).
132    ///
133    /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
134    /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
135    num_active_peers: Arc<AtomicUsize>,
136    /// Metrics for the Network
137    metrics: NetworkMetrics,
138    /// Disconnect metrics for the Network
139    disconnect_metrics: DisconnectMetrics,
140}
141
142impl NetworkManager {
143    /// Creates the manager of a new network with [`EthNetworkPrimitives`] types.
144    ///
145    /// ```no_run
146    /// # async fn f() {
147    /// use reth_chainspec::MAINNET;
148    /// use reth_network::{NetworkConfig, NetworkManager};
149    /// let config =
150    ///     NetworkConfig::builder_with_rng_secret_key().build_with_noop_provider(MAINNET.clone());
151    /// let manager = NetworkManager::eth(config).await;
152    /// # }
153    /// ```
154    pub async fn eth<C: BlockNumReader + 'static>(
155        config: NetworkConfig<C, EthNetworkPrimitives>,
156    ) -> Result<Self, NetworkError> {
157        Self::new(config).await
158    }
159}
160
161impl<N: NetworkPrimitives> NetworkManager<N> {
162    /// Sets the dedicated channel for events intended for the
163    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
164    pub fn with_transactions(
165        mut self,
166        tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
167    ) -> Self {
168        self.set_transactions(tx);
169        self
170    }
171
172    /// Sets the dedicated channel for events intended for the
173    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
174    pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
175        self.to_transactions_manager =
176            Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
177    }
178
179    /// Sets the dedicated channel for events intended for the
180    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
181    pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
182        self.set_eth_request_handler(tx);
183        self
184    }
185
186    /// Sets the dedicated channel for events intended for the
187    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
188    pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
189        self.to_eth_request_handler = Some(tx);
190    }
191
192    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
193    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
194        self.swarm.add_rlpx_sub_protocol(protocol)
195    }
196
197    /// Returns the [`NetworkHandle`] that can be cloned and shared.
198    ///
199    /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
200    pub const fn handle(&self) -> &NetworkHandle<N> {
201        &self.handle
202    }
203
204    /// Returns the secret key used for authenticating sessions.
205    pub const fn secret_key(&self) -> SecretKey {
206        self.swarm.sessions().secret_key()
207    }
208
209    #[inline]
210    fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
211        let metrics = &self.metrics;
212
213        let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
214
215        // update metrics for whole poll function
216        metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
217        // update poll metrics for nested items
218        metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
219        metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
220    }
221
222    /// Creates the manager of a new network.
223    ///
224    /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
225    /// state of the entire network.
226    pub async fn new<C: BlockNumReader + 'static>(
227        config: NetworkConfig<C, N>,
228    ) -> Result<Self, NetworkError> {
229        let NetworkConfig {
230            client,
231            secret_key,
232            discovery_v4_addr,
233            mut discovery_v4_config,
234            mut discovery_v5_config,
235            listener_addr,
236            peers_config,
237            sessions_config,
238            chain_id,
239            block_import,
240            network_mode,
241            boot_nodes,
242            executor,
243            hello_message,
244            status,
245            fork_filter,
246            dns_discovery_config,
247            extra_protocols,
248            tx_gossip_disabled,
249            transactions_manager_config: _,
250            nat,
251            handshake,
252        } = config;
253
254        let peers_manager = PeersManager::new(peers_config);
255        let peers_handle = peers_manager.handle();
256
257        let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
258            NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
259        })?;
260
261        // retrieve the tcp address of the socket
262        let listener_addr = incoming.local_address();
263
264        // resolve boot nodes
265        let resolved_boot_nodes =
266            futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
267
268        if let Some(disc_config) = discovery_v4_config.as_mut() {
269            // merge configured boot nodes
270            disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
271            disc_config.add_eip868_pair("eth", status.forkid);
272        }
273
274        if let Some(discv5) = discovery_v5_config.as_mut() {
275            // merge configured boot nodes
276            discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
277        }
278
279        let discovery = Discovery::new(
280            listener_addr,
281            discovery_v4_addr,
282            secret_key,
283            discovery_v4_config,
284            discovery_v5_config,
285            dns_discovery_config,
286        )
287        .await?;
288        // need to retrieve the addr here since provided port could be `0`
289        let local_peer_id = discovery.local_id();
290        let discv4 = discovery.discv4();
291        let discv5 = discovery.discv5();
292
293        let num_active_peers = Arc::new(AtomicUsize::new(0));
294
295        let sessions = SessionManager::new(
296            secret_key,
297            sessions_config,
298            executor,
299            status,
300            hello_message,
301            fork_filter,
302            extra_protocols,
303            handshake,
304        );
305
306        let state = NetworkState::new(
307            crate::state::BlockNumReader::new(client),
308            discovery,
309            peers_manager,
310            Arc::clone(&num_active_peers),
311        );
312
313        let swarm = Swarm::new(incoming, sessions, state);
314
315        let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
316
317        let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
318
319        let handle = NetworkHandle::new(
320            Arc::clone(&num_active_peers),
321            Arc::new(Mutex::new(listener_addr)),
322            to_manager_tx,
323            secret_key,
324            local_peer_id,
325            peers_handle,
326            network_mode,
327            Arc::new(AtomicU64::new(chain_id)),
328            tx_gossip_disabled,
329            discv4,
330            discv5,
331            event_sender.clone(),
332            nat,
333        );
334
335        Ok(Self {
336            swarm,
337            handle,
338            from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
339            block_import,
340            event_sender,
341            to_transactions_manager: None,
342            to_eth_request_handler: None,
343            num_active_peers,
344            metrics: Default::default(),
345            disconnect_metrics: Default::default(),
346        })
347    }
348
349    /// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
350    /// components of the network
351    ///
352    /// ```
353    /// use reth_network::{
354    ///     config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
355    /// };
356    /// use reth_network_peers::mainnet_nodes;
357    /// use reth_storage_api::noop::NoopProvider;
358    /// use reth_transaction_pool::TransactionPool;
359    /// async fn launch<Pool: TransactionPool>(pool: Pool) {
360    ///     // This block provider implementation is used for testing purposes.
361    ///     let client = NoopProvider::default();
362    ///
363    ///     // The key that's used for encrypting sessions and to identify our node.
364    ///     let local_key = rng_secret_key();
365    ///
366    ///     let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
367    ///         .boot_nodes(mainnet_nodes())
368    ///         .build(client.clone());
369    ///     let transactions_manager_config = config.transactions_manager_config.clone();
370    ///
371    ///     // create the network instance
372    ///     let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
373    ///         .await
374    ///         .unwrap()
375    ///         .transactions(pool, transactions_manager_config)
376    ///         .request_handler(client)
377    ///         .split_with_handle();
378    /// }
379    /// ```
380    pub async fn builder<C: BlockNumReader + 'static>(
381        config: NetworkConfig<C, N>,
382    ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
383        let network = Self::new(config).await?;
384        Ok(network.into_builder())
385    }
386
387    /// Create a [`NetworkBuilder`] to configure all components of the network
388    pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
389        NetworkBuilder { network: self, transactions: (), request_handler: () }
390    }
391
392    /// Returns the [`SocketAddr`] that listens for incoming tcp connections.
393    pub const fn local_addr(&self) -> SocketAddr {
394        self.swarm.listener().local_address()
395    }
396
397    /// How many peers we're currently connected to.
398    pub fn num_connected_peers(&self) -> usize {
399        self.swarm.state().num_active_peers()
400    }
401
402    /// Returns the [`PeerId`] used in the network.
403    pub fn peer_id(&self) -> &PeerId {
404        self.handle.peer_id()
405    }
406
407    /// Returns an iterator over all peers in the peer set.
408    pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
409        self.swarm.state().peers().iter_peers()
410    }
411
412    /// Returns the number of peers in the peer set.
413    pub fn num_known_peers(&self) -> usize {
414        self.swarm.state().peers().num_known_peers()
415    }
416
417    /// Returns a new [`PeersHandle`] that can be cloned and shared.
418    ///
419    /// The [`PeersHandle`] can be used to interact with the network's peer set.
420    pub fn peers_handle(&self) -> PeersHandle {
421        self.swarm.state().peers().handle()
422    }
423
424    /// Collect the peers from the [`NetworkManager`] and write them to the given
425    /// `persistent_peers_file`.
426    pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
427        let known_peers = self.all_peers().collect::<Vec<_>>();
428        persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
429        reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
430        Ok(())
431    }
432
433    /// Returns a new [`FetchClient`] that can be cloned and shared.
434    ///
435    /// The [`FetchClient`] is the entrypoint for sending requests to the network.
436    pub fn fetch_client(&self) -> FetchClient<N> {
437        self.swarm.state().fetch_client()
438    }
439
440    /// Returns the current [`NetworkStatus`] for the local node.
441    pub fn status(&self) -> NetworkStatus {
442        let sessions = self.swarm.sessions();
443        let status = sessions.status();
444        let hello_message = sessions.hello_message();
445
446        #[expect(deprecated)]
447        NetworkStatus {
448            client_version: hello_message.client_version,
449            protocol_version: hello_message.protocol_version as u64,
450            eth_protocol_info: EthProtocolInfo {
451                difficulty: None,
452                head: status.blockhash,
453                network: status.chain.id(),
454                genesis: status.genesis,
455                config: Default::default(),
456            },
457        }
458    }
459
460    /// Sends an event to the [`TransactionsManager`](crate::transactions::TransactionsManager) if
461    /// configured.
462    fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
463        if let Some(ref tx) = self.to_transactions_manager {
464            let _ = tx.send(event);
465        }
466    }
467
468    /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
469    /// configured.
470    fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
471        if let Some(ref reqs) = self.to_eth_request_handler {
472            let _ = reqs.try_send(event).map_err(|e| {
473                if let TrySendError::Full(_) = e {
474                    debug!(target:"net", "EthRequestHandler channel is full!");
475                    self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
476                }
477            });
478        }
479    }
480
481    /// Handle an incoming request from the peer
482    fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
483        match req {
484            PeerRequest::GetBlockHeaders { request, response } => {
485                self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
486                    peer_id,
487                    request,
488                    response,
489                })
490            }
491            PeerRequest::GetBlockBodies { request, response } => {
492                self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
493                    peer_id,
494                    request,
495                    response,
496                })
497            }
498            PeerRequest::GetNodeData { request, response } => {
499                self.delegate_eth_request(IncomingEthRequest::GetNodeData {
500                    peer_id,
501                    request,
502                    response,
503                })
504            }
505            PeerRequest::GetReceipts { request, response } => {
506                self.delegate_eth_request(IncomingEthRequest::GetReceipts {
507                    peer_id,
508                    request,
509                    response,
510                })
511            }
512            PeerRequest::GetPooledTransactions { request, response } => {
513                self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
514                    peer_id,
515                    request,
516                    response,
517                });
518            }
519        }
520    }
521
522    /// Invoked after a `NewBlock` message from the peer was validated
523    fn on_block_import_result(&mut self, event: BlockImportEvent<N::Block>) {
524        match event {
525            BlockImportEvent::Announcement(validation) => match validation {
526                BlockValidation::ValidHeader { block } => {
527                    self.swarm.state_mut().announce_new_block(block);
528                }
529                BlockValidation::ValidBlock { block } => {
530                    self.swarm.state_mut().announce_new_block_hash(block);
531                }
532            },
533            BlockImportEvent::Outcome(outcome) => {
534                let BlockImportOutcome { peer, result } = outcome;
535                match result {
536                    Ok(validated_block) => match validated_block {
537                        BlockValidation::ValidHeader { block } => {
538                            self.swarm.state_mut().update_peer_block(
539                                &peer,
540                                block.hash,
541                                block.number(),
542                            );
543                            self.swarm.state_mut().announce_new_block(block);
544                        }
545                        BlockValidation::ValidBlock { block } => {
546                            self.swarm.state_mut().announce_new_block_hash(block);
547                        }
548                    },
549                    Err(_err) => {
550                        self.swarm
551                            .state_mut()
552                            .peers_mut()
553                            .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
554                    }
555                }
556            }
557        }
558    }
559
560    /// Enforces [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) consensus rules for the network protocol
561    ///
562    /// Depending on the mode of the network:
563    ///    - disconnect peer if in POS
564    ///    - execute the closure if in POW
565    fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
566    where
567        F: FnOnce(&mut Self),
568    {
569        // reject message in POS
570        if self.handle.mode().is_stake() {
571            // connections to peers which send invalid messages should be terminated
572            self.swarm
573                .sessions_mut()
574                .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
575        } else {
576            only_pow(self);
577        }
578    }
579
580    /// Handles a received Message from the peer's session.
581    fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
582        match msg {
583            PeerMessage::NewBlockHashes(hashes) => {
584                self.within_pow_or_disconnect(peer_id, |this| {
585                    // update peer's state, to track what blocks this peer has seen
586                    this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
587                    // start block import process for the hashes
588                    this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
589                })
590            }
591            PeerMessage::NewBlock(block) => {
592                self.within_pow_or_disconnect(peer_id, move |this| {
593                    this.swarm.state_mut().on_new_block(peer_id, block.hash);
594                    // start block import process
595                    this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
596                });
597            }
598            PeerMessage::PooledTransactions(msg) => {
599                self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
600                    peer_id,
601                    msg,
602                });
603            }
604            PeerMessage::EthRequest(req) => {
605                self.on_eth_request(peer_id, req);
606            }
607            PeerMessage::ReceivedTransaction(msg) => {
608                self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
609                    peer_id,
610                    msg,
611                });
612            }
613            PeerMessage::SendTransactions(_) => {
614                unreachable!("Not emitted by session")
615            }
616            PeerMessage::BlockRangeUpdated(_) => {}
617            PeerMessage::Other(other) => {
618                debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
619            }
620        }
621    }
622
623    /// Handler for received messages from a handle
624    fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
625        match msg {
626            NetworkHandleMessage::DiscoveryListener(tx) => {
627                self.swarm.state_mut().discovery_mut().add_listener(tx);
628            }
629            NetworkHandleMessage::AnnounceBlock(block, hash) => {
630                if self.handle.mode().is_stake() {
631                    // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
632                    warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
633                    return
634                }
635                let msg = NewBlockMessage { hash, block: Arc::new(block) };
636                self.swarm.state_mut().announce_new_block(msg);
637            }
638            NetworkHandleMessage::EthRequest { peer_id, request } => {
639                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
640            }
641            NetworkHandleMessage::SendTransaction { peer_id, msg } => {
642                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
643            }
644            NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
645                .swarm
646                .sessions_mut()
647                .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
648            NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
649                self.swarm.state_mut().add_trusted_peer_id(peer_id);
650            }
651            NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
652                // only add peer if we are not shutting down
653                if !self.swarm.is_shutting_down() {
654                    self.swarm.state_mut().add_peer_kind(peer, kind, addr);
655                }
656            }
657            NetworkHandleMessage::RemovePeer(peer_id, kind) => {
658                self.swarm.state_mut().remove_peer_kind(peer_id, kind);
659            }
660            NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
661                self.swarm.sessions_mut().disconnect(peer_id, reason);
662            }
663            NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
664                self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
665            }
666            NetworkHandleMessage::SetNetworkState(net_state) => {
667                // Sets network connection state between Active and Hibernate.
668                // If hibernate stops the node to fill new outbound
669                // connections, this is beneficial for sync stages that do not require a network
670                // connection.
671                self.swarm.on_network_state_change(net_state);
672            }
673
674            NetworkHandleMessage::Shutdown(tx) => {
675                self.perform_network_shutdown();
676                let _ = tx.send(());
677            }
678            NetworkHandleMessage::ReputationChange(peer_id, kind) => {
679                self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
680            }
681            NetworkHandleMessage::GetReputationById(peer_id, tx) => {
682                let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
683            }
684            NetworkHandleMessage::FetchClient(tx) => {
685                let _ = tx.send(self.fetch_client());
686            }
687            NetworkHandleMessage::GetStatus(tx) => {
688                let _ = tx.send(self.status());
689            }
690            NetworkHandleMessage::StatusUpdate { head } => {
691                if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
692                    self.swarm.state_mut().update_fork_id(transition.current);
693                }
694            }
695            NetworkHandleMessage::GetPeerInfos(tx) => {
696                let _ = tx.send(self.get_peer_infos());
697            }
698            NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
699                let _ = tx.send(self.get_peer_info_by_id(peer_id));
700            }
701            NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
702                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
703            }
704            NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
705                let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
706                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
707            }
708            NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
709            NetworkHandleMessage::GetTransactionsHandle(tx) => {
710                if let Some(ref tx_inner) = self.to_transactions_manager {
711                    let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
712                } else {
713                    let _ = tx.send(None);
714                }
715            }
716            NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
717                self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
718            }
719            NetworkHandleMessage::EthMessage { peer_id, message } => {
720                self.swarm.sessions_mut().send_message(&peer_id, message)
721            }
722        }
723    }
724
725    fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
726        // handle event
727        match event {
728            SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
729            SwarmEvent::TcpListenerClosed { remote_addr } => {
730                trace!(target: "net", ?remote_addr, "TCP listener closed.");
731            }
732            SwarmEvent::TcpListenerError(err) => {
733                trace!(target: "net", %err, "TCP connection error.");
734            }
735            SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
736                trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
737                self.metrics.total_incoming_connections.increment(1);
738                self.metrics
739                    .incoming_connections
740                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
741            }
742            SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
743                trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
744                self.metrics.total_outgoing_connections.increment(1);
745                self.update_pending_connection_metrics()
746            }
747            SwarmEvent::SessionEstablished {
748                peer_id,
749                remote_addr,
750                client_version,
751                capabilities,
752                version,
753                messages,
754                status,
755                direction,
756            } => {
757                let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
758                self.metrics.connected_peers.set(total_active as f64);
759                debug!(
760                    target: "net",
761                    ?remote_addr,
762                    %client_version,
763                    ?peer_id,
764                    ?total_active,
765                    kind=%direction,
766                    peer_enode=%NodeRecord::new(remote_addr, peer_id),
767                    "Session established"
768                );
769
770                if direction.is_incoming() {
771                    self.swarm
772                        .state_mut()
773                        .peers_mut()
774                        .on_incoming_session_established(peer_id, remote_addr);
775                }
776
777                if direction.is_outgoing() {
778                    self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
779                }
780
781                self.update_active_connection_metrics();
782
783                let peer_kind = self
784                    .swarm
785                    .state()
786                    .peers()
787                    .peer_by_id(peer_id)
788                    .map(|(_, kind)| kind)
789                    .unwrap_or_default();
790                let session_info = SessionInfo {
791                    peer_id,
792                    remote_addr,
793                    client_version,
794                    capabilities,
795                    status,
796                    version,
797                    peer_kind,
798                };
799
800                self.event_sender
801                    .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
802            }
803            SwarmEvent::PeerAdded(peer_id) => {
804                trace!(target: "net", ?peer_id, "Peer added");
805                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
806                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
807            }
808            SwarmEvent::PeerRemoved(peer_id) => {
809                trace!(target: "net", ?peer_id, "Peer dropped");
810                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
811                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
812            }
813            SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
814                let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
815                self.metrics.connected_peers.set(total_active as f64);
816                trace!(
817                    target: "net",
818                    ?remote_addr,
819                    ?peer_id,
820                    ?total_active,
821                    ?error,
822                    "Session disconnected"
823                );
824
825                let mut reason = None;
826                if let Some(ref err) = error {
827                    // If the connection was closed due to an error, we report
828                    // the peer
829                    self.swarm.state_mut().peers_mut().on_active_session_dropped(
830                        &remote_addr,
831                        &peer_id,
832                        err,
833                    );
834                    reason = err.as_disconnected();
835                } else {
836                    // Gracefully disconnected
837                    self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
838                }
839                self.metrics.closed_sessions.increment(1);
840                self.update_active_connection_metrics();
841
842                if let Some(reason) = reason {
843                    self.disconnect_metrics.increment(reason);
844                }
845                self.metrics.backed_off_peers.set(
846                        self.swarm
847                            .state()
848                            .peers()
849                            .num_backed_off_peers()
850                            .saturating_sub(1)
851                            as f64,
852                    );
853                self.event_sender
854                    .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
855            }
856            SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
857                trace!(
858                    target: "net",
859                    ?remote_addr,
860                    ?error,
861                    "Incoming pending session failed"
862                );
863
864                if let Some(ref err) = error {
865                    self.swarm
866                        .state_mut()
867                        .peers_mut()
868                        .on_incoming_pending_session_dropped(remote_addr, err);
869                    self.metrics.pending_session_failures.increment(1);
870                    if let Some(reason) = err.as_disconnected() {
871                        self.disconnect_metrics.increment(reason);
872                    }
873                } else {
874                    self.swarm
875                        .state_mut()
876                        .peers_mut()
877                        .on_incoming_pending_session_gracefully_closed();
878                }
879                self.metrics.closed_sessions.increment(1);
880                self.metrics
881                    .incoming_connections
882                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
883                self.metrics.backed_off_peers.set(
884                        self.swarm
885                            .state()
886                            .peers()
887                            .num_backed_off_peers()
888                            .saturating_sub(1)
889                            as f64,
890                    );
891            }
892            SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
893                trace!(
894                    target: "net",
895                    ?remote_addr,
896                    ?peer_id,
897                    ?error,
898                    "Outgoing pending session failed"
899                );
900
901                if let Some(ref err) = error {
902                    self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
903                        &remote_addr,
904                        &peer_id,
905                        err,
906                    );
907                    self.metrics.pending_session_failures.increment(1);
908                    if let Some(reason) = err.as_disconnected() {
909                        self.disconnect_metrics.increment(reason);
910                    }
911                } else {
912                    self.swarm
913                        .state_mut()
914                        .peers_mut()
915                        .on_outgoing_pending_session_gracefully_closed(&peer_id);
916                }
917                self.metrics.closed_sessions.increment(1);
918                self.update_pending_connection_metrics();
919
920                self.metrics.backed_off_peers.set(
921                        self.swarm
922                            .state()
923                            .peers()
924                            .num_backed_off_peers()
925                            .saturating_sub(1)
926                            as f64,
927                    );
928            }
929            SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
930                trace!(
931                    target: "net",
932                    ?remote_addr,
933                    ?peer_id,
934                    %error,
935                    "Outgoing connection error"
936                );
937
938                self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
939                    &remote_addr,
940                    &peer_id,
941                    &error,
942                );
943
944                self.metrics.backed_off_peers.set(
945                        self.swarm
946                            .state()
947                            .peers()
948                            .num_backed_off_peers()
949                            .saturating_sub(1)
950                            as f64,
951                    );
952                self.update_pending_connection_metrics();
953            }
954            SwarmEvent::BadMessage { peer_id } => {
955                self.swarm
956                    .state_mut()
957                    .peers_mut()
958                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
959                self.metrics.invalid_messages_received.increment(1);
960            }
961            SwarmEvent::ProtocolBreach { peer_id } => {
962                self.swarm
963                    .state_mut()
964                    .peers_mut()
965                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
966            }
967        }
968    }
969
970    /// Returns [`PeerInfo`] for all connected peers
971    fn get_peer_infos(&self) -> Vec<PeerInfo> {
972        self.swarm
973            .sessions()
974            .active_sessions()
975            .iter()
976            .filter_map(|(&peer_id, session)| {
977                self.swarm
978                    .state()
979                    .peers()
980                    .peer_by_id(peer_id)
981                    .map(|(record, kind)| session.peer_info(&record, kind))
982            })
983            .collect()
984    }
985
986    /// Returns [`PeerInfo`] for a given peer.
987    ///
988    /// Returns `None` if there's no active session to the peer.
989    fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
990        self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
991            self.swarm
992                .state()
993                .peers()
994                .peer_by_id(peer_id)
995                .map(|(record, kind)| session.peer_info(&record, kind))
996        })
997    }
998
999    /// Returns [`PeerInfo`] for a given peers.
1000    ///
1001    /// Ignore the non-active peer.
1002    fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1003        peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1004    }
1005
1006    /// Updates the metrics for active,established connections
1007    #[inline]
1008    fn update_active_connection_metrics(&self) {
1009        self.metrics
1010            .incoming_connections
1011            .set(self.swarm.state().peers().num_inbound_connections() as f64);
1012        self.metrics
1013            .outgoing_connections
1014            .set(self.swarm.state().peers().num_outbound_connections() as f64);
1015    }
1016
1017    /// Updates the metrics for pending connections
1018    #[inline]
1019    fn update_pending_connection_metrics(&self) {
1020        self.metrics
1021            .pending_outgoing_connections
1022            .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1023        self.metrics
1024            .total_pending_connections
1025            .set(self.swarm.sessions().num_pending_connections() as f64);
1026    }
1027
1028    /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
1029    ///
1030    /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
1031    pub async fn run_until_graceful_shutdown<F, R>(
1032        mut self,
1033        shutdown: GracefulShutdown,
1034        shutdown_hook: F,
1035    ) -> R
1036    where
1037        F: FnOnce(Self) -> R,
1038    {
1039        let mut graceful_guard = None;
1040        tokio::select! {
1041            _ = &mut self => {},
1042            guard = shutdown => {
1043                graceful_guard = Some(guard);
1044            },
1045        }
1046
1047        self.perform_network_shutdown();
1048        let res = shutdown_hook(self);
1049        drop(graceful_guard);
1050        res
1051    }
1052
1053    /// Performs a graceful network shutdown by stopping new connections from being accepted while
1054    /// draining current and pending connections.
1055    fn perform_network_shutdown(&mut self) {
1056        // Set connection status to `Shutdown`. Stops node from accepting
1057        // new incoming connections as well as sending connection requests to newly
1058        // discovered nodes.
1059        self.swarm.on_shutdown_requested();
1060        // Disconnect all active connections
1061        self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1062        // drop pending connections
1063        self.swarm.sessions_mut().disconnect_all_pending();
1064    }
1065}
1066
1067impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1068    type Output = ();
1069
1070    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1071        let start = Instant::now();
1072        let mut poll_durations = NetworkManagerPollDurations::default();
1073
1074        let this = self.get_mut();
1075
1076        // poll new block imports (expected to be a noop for POS)
1077        while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1078            this.on_block_import_result(outcome);
1079        }
1080
1081        // These loops drive the entire state of network and does a lot of work. Under heavy load
1082        // (many messages/events), data may arrive faster than it can be processed (incoming
1083        // messages/requests -> events), and it is possible that more data has already arrived by
1084        // the time an internal event is processed. Which could turn this loop into a busy loop.
1085        // Without yielding back to the executor, it can starve other tasks waiting on that
1086        // executor to execute them, or drive underlying resources To prevent this, we
1087        // preemptively return control when the `budget` is exhausted. The value itself is chosen
1088        // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
1089        // low enough that this loop does not starve other tasks for too long. If the budget is
1090        // exhausted we manually yield back control to the (coop) scheduler. This manual yield
1091        // point should prevent situations where polling appears to be frozen. See also
1092        // <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
1093        // <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
1094        //
1095        // Testing has shown that this loop naturally reaches the pending state within 1-5
1096        // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
1097        // range of what's recommended as rule of thumb.
1098        // <https://ryhl.io/blog/async-what-is-blocking/>
1099
1100        // process incoming messages from a handle (`TransactionsManager` has one)
1101        //
1102        // will only be closed if the channel was deliberately closed since we always have an
1103        // instance of `NetworkHandle`
1104        let start_network_handle = Instant::now();
1105        let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1106            "net",
1107            "Network message channel",
1108            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1109            this.from_handle_rx.poll_next_unpin(cx),
1110            |msg| this.on_handle_message(msg),
1111            error!("Network channel closed");
1112        );
1113        poll_durations.acc_network_handle = start_network_handle.elapsed();
1114
1115        // process incoming messages from the network
1116        let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1117            "net",
1118            "Swarm events stream",
1119            DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1120            this.swarm.poll_next_unpin(cx),
1121            |event| this.on_swarm_event(event),
1122        );
1123        poll_durations.acc_swarm =
1124            start_network_handle.elapsed() - poll_durations.acc_network_handle;
1125
1126        // all streams are fully drained and import futures pending
1127        if maybe_more_handle_messages || maybe_more_swarm_events {
1128            // make sure we're woken up again
1129            cx.waker().wake_by_ref();
1130            return Poll::Pending
1131        }
1132
1133        this.update_poll_metrics(start, poll_durations);
1134
1135        Poll::Pending
1136    }
1137}
1138
1139#[derive(Debug, Default)]
1140struct NetworkManagerPollDurations {
1141    acc_network_handle: Duration,
1142    acc_swarm: Duration,
1143}