1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportOutcome, BlockValidation},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28 network::{NetworkHandle, NetworkHandleMessage},
29 peers::PeersManager,
30 poll_nested_stream_with_budget,
31 protocol::IntoRlpxSubProtocol,
32 session::SessionManager,
33 state::NetworkState,
34 swarm::{Swarm, SwarmEvent},
35 transactions::NetworkTransactionEvent,
36 FetchClient, NetworkBuilder,
37};
38use futures::{Future, StreamExt};
39use parking_lot::Mutex;
40use reth_eth_wire::{
41 capability::CapabilityMessage, Capabilities, DisconnectReason, EthNetworkPrimitives,
42 NetworkPrimitives,
43};
44use reth_fs_util::{self as fs, FsPathError};
45use reth_metrics::common::mpsc::UnboundedMeteredSender;
46use reth_network_api::{
47 events::{PeerEvent, SessionInfo},
48 test_utils::PeersHandle,
49 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
50};
51use reth_network_peers::{NodeRecord, PeerId};
52use reth_network_types::ReputationChangeKind;
53use reth_storage_api::BlockNumReader;
54use reth_tasks::shutdown::GracefulShutdown;
55use reth_tokio_util::EventSender;
56use secp256k1::SecretKey;
57use std::{
58 net::SocketAddr,
59 path::Path,
60 pin::Pin,
61 sync::{
62 atomic::{AtomicU64, AtomicUsize, Ordering},
63 Arc,
64 },
65 task::{Context, Poll},
66 time::{Duration, Instant},
67};
68use tokio::sync::mpsc::{self, error::TrySendError};
69use tokio_stream::wrappers::UnboundedReceiverStream;
70use tracing::{debug, error, trace, warn};
71
72#[cfg_attr(doc, aquamarine::aquamarine)]
73#[derive(Debug)]
105#[must_use = "The NetworkManager does nothing unless polled"]
106pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
107 swarm: Swarm<N>,
109 handle: NetworkHandle<N>,
111 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
113 block_import: Box<dyn BlockImport<N::Block>>,
115 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
117 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
120 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
134 num_active_peers: Arc<AtomicUsize>,
139 metrics: NetworkMetrics,
141 disconnect_metrics: DisconnectMetrics,
143}
144
145impl<N: NetworkPrimitives> NetworkManager<N> {
147 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
150 self.to_transactions_manager =
151 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
152 }
153
154 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
157 self.to_eth_request_handler = Some(tx);
158 }
159
160 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
162 self.swarm.add_rlpx_sub_protocol(protocol)
163 }
164
165 pub const fn handle(&self) -> &NetworkHandle<N> {
169 &self.handle
170 }
171
172 pub const fn secret_key(&self) -> SecretKey {
174 self.swarm.sessions().secret_key()
175 }
176
177 #[inline]
178 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
179 let metrics = &self.metrics;
180
181 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
182
183 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
185 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
187 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
188 }
189
190 pub async fn new<C: BlockNumReader + 'static>(
195 config: NetworkConfig<C, N>,
196 ) -> Result<Self, NetworkError> {
197 let NetworkConfig {
198 client,
199 secret_key,
200 discovery_v4_addr,
201 mut discovery_v4_config,
202 mut discovery_v5_config,
203 listener_addr,
204 peers_config,
205 sessions_config,
206 chain_id,
207 block_import,
208 network_mode,
209 boot_nodes,
210 executor,
211 hello_message,
212 status,
213 fork_filter,
214 dns_discovery_config,
215 extra_protocols,
216 tx_gossip_disabled,
217 transactions_manager_config: _,
218 nat,
219 } = config;
220
221 let peers_manager = PeersManager::new(peers_config);
222 let peers_handle = peers_manager.handle();
223
224 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
225 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
226 })?;
227
228 let listener_addr = incoming.local_address();
230
231 let resolved_boot_nodes =
233 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
234
235 if let Some(disc_config) = discovery_v4_config.as_mut() {
236 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
238 disc_config.add_eip868_pair("eth", status.forkid);
239 }
240
241 if let Some(discv5) = discovery_v5_config.as_mut() {
242 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
244 }
245
246 let discovery = Discovery::new(
247 listener_addr,
248 discovery_v4_addr,
249 secret_key,
250 discovery_v4_config,
251 discovery_v5_config,
252 dns_discovery_config,
253 )
254 .await?;
255 let local_peer_id = discovery.local_id();
257 let discv4 = discovery.discv4();
258 let discv5 = discovery.discv5();
259
260 let num_active_peers = Arc::new(AtomicUsize::new(0));
261
262 let sessions = SessionManager::new(
263 secret_key,
264 sessions_config,
265 executor,
266 status,
267 hello_message,
268 fork_filter,
269 extra_protocols,
270 );
271
272 let state = NetworkState::new(
273 crate::state::BlockNumReader::new(client),
274 discovery,
275 peers_manager,
276 Arc::clone(&num_active_peers),
277 );
278
279 let swarm = Swarm::new(incoming, sessions, state);
280
281 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
282
283 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
284
285 let handle = NetworkHandle::new(
286 Arc::clone(&num_active_peers),
287 Arc::new(Mutex::new(listener_addr)),
288 to_manager_tx,
289 secret_key,
290 local_peer_id,
291 peers_handle,
292 network_mode,
293 Arc::new(AtomicU64::new(chain_id)),
294 tx_gossip_disabled,
295 discv4,
296 discv5,
297 event_sender.clone(),
298 nat,
299 );
300
301 Ok(Self {
302 swarm,
303 handle,
304 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
305 block_import,
306 event_sender,
307 to_transactions_manager: None,
308 to_eth_request_handler: None,
309 num_active_peers,
310 metrics: Default::default(),
311 disconnect_metrics: Default::default(),
312 })
313 }
314
315 pub async fn builder<C: BlockNumReader + 'static>(
347 config: NetworkConfig<C, N>,
348 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
349 let network = Self::new(config).await?;
350 Ok(network.into_builder())
351 }
352
353 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
355 NetworkBuilder { network: self, transactions: (), request_handler: () }
356 }
357
358 pub const fn local_addr(&self) -> SocketAddr {
360 self.swarm.listener().local_address()
361 }
362
363 pub fn num_connected_peers(&self) -> usize {
365 self.swarm.state().num_active_peers()
366 }
367
368 pub fn peer_id(&self) -> &PeerId {
370 self.handle.peer_id()
371 }
372
373 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
375 self.swarm.state().peers().iter_peers()
376 }
377
378 pub fn num_known_peers(&self) -> usize {
380 self.swarm.state().peers().num_known_peers()
381 }
382
383 pub fn peers_handle(&self) -> PeersHandle {
387 self.swarm.state().peers().handle()
388 }
389
390 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
393 let known_peers = self.all_peers().collect::<Vec<_>>();
394 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
395 reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
396 Ok(())
397 }
398
399 pub fn fetch_client(&self) -> FetchClient<N> {
403 self.swarm.state().fetch_client()
404 }
405
406 pub fn status(&self) -> NetworkStatus {
408 let sessions = self.swarm.sessions();
409 let status = sessions.status();
410 let hello_message = sessions.hello_message();
411
412 NetworkStatus {
413 client_version: hello_message.client_version,
414 protocol_version: hello_message.protocol_version as u64,
415 eth_protocol_info: EthProtocolInfo {
416 difficulty: status.total_difficulty,
417 head: status.blockhash,
418 network: status.chain.id(),
419 genesis: status.genesis,
420 config: Default::default(),
421 },
422 }
423 }
424
425 fn on_invalid_message(
427 &mut self,
428 peer_id: PeerId,
429 _capabilities: Arc<Capabilities>,
430 _message: CapabilityMessage<N>,
431 ) {
432 trace!(target: "net", ?peer_id, "received unexpected message");
433 self.swarm
434 .state_mut()
435 .peers_mut()
436 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
437 }
438
439 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
442 if let Some(ref tx) = self.to_transactions_manager {
443 let _ = tx.send(event);
444 }
445 }
446
447 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
450 if let Some(ref reqs) = self.to_eth_request_handler {
451 let _ = reqs.try_send(event).map_err(|e| {
452 if let TrySendError::Full(_) = e {
453 debug!(target:"net", "EthRequestHandler channel is full!");
454 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
455 }
456 });
457 }
458 }
459
460 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
462 match req {
463 PeerRequest::GetBlockHeaders { request, response } => {
464 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
465 peer_id,
466 request,
467 response,
468 })
469 }
470 PeerRequest::GetBlockBodies { request, response } => {
471 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
472 peer_id,
473 request,
474 response,
475 })
476 }
477 PeerRequest::GetNodeData { request, response } => {
478 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
479 peer_id,
480 request,
481 response,
482 })
483 }
484 PeerRequest::GetReceipts { request, response } => {
485 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
486 peer_id,
487 request,
488 response,
489 })
490 }
491 PeerRequest::GetPooledTransactions { request, response } => {
492 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
493 peer_id,
494 request,
495 response,
496 });
497 }
498 }
499 }
500
501 fn on_block_import_result(&mut self, outcome: BlockImportOutcome<N::Block>) {
503 let BlockImportOutcome { peer, result } = outcome;
504 match result {
505 Ok(validated_block) => match validated_block {
506 BlockValidation::ValidHeader { block } => {
507 self.swarm.state_mut().update_peer_block(&peer, block.hash, block.number());
508 self.swarm.state_mut().announce_new_block(block);
509 }
510 BlockValidation::ValidBlock { block } => {
511 self.swarm.state_mut().announce_new_block_hash(block);
512 }
513 },
514 Err(_err) => {
515 self.swarm
516 .state_mut()
517 .peers_mut()
518 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
519 }
520 }
521 }
522
523 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
529 where
530 F: FnOnce(&mut Self),
531 {
532 if self.handle.mode().is_stake() {
534 self.swarm
536 .sessions_mut()
537 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
538 } else {
539 only_pow(self);
540 }
541 }
542
543 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
545 match msg {
546 PeerMessage::NewBlockHashes(hashes) => {
547 self.within_pow_or_disconnect(peer_id, |this| {
548 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
550 })
551 }
552 PeerMessage::NewBlock(block) => {
553 self.within_pow_or_disconnect(peer_id, move |this| {
554 this.swarm.state_mut().on_new_block(peer_id, block.hash);
555 this.block_import.on_new_block(peer_id, block);
557 });
558 }
559 PeerMessage::PooledTransactions(msg) => {
560 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
561 peer_id,
562 msg,
563 });
564 }
565 PeerMessage::EthRequest(req) => {
566 self.on_eth_request(peer_id, req);
567 }
568 PeerMessage::ReceivedTransaction(msg) => {
569 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
570 peer_id,
571 msg,
572 });
573 }
574 PeerMessage::SendTransactions(_) => {
575 unreachable!("Not emitted by session")
576 }
577 PeerMessage::Other(other) => {
578 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
579 }
580 }
581 }
582
583 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
585 match msg {
586 NetworkHandleMessage::DiscoveryListener(tx) => {
587 self.swarm.state_mut().discovery_mut().add_listener(tx);
588 }
589 NetworkHandleMessage::AnnounceBlock(block, hash) => {
590 if self.handle.mode().is_stake() {
591 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
593 return
594 }
595 let msg = NewBlockMessage { hash, block: Arc::new(block) };
596 self.swarm.state_mut().announce_new_block(msg);
597 }
598 NetworkHandleMessage::EthRequest { peer_id, request } => {
599 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
600 }
601 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
602 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
603 }
604 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
605 .swarm
606 .sessions_mut()
607 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
608 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
609 self.swarm.state_mut().add_trusted_peer_id(peer_id);
610 }
611 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
612 if !self.swarm.is_shutting_down() {
614 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
615 }
616 }
617 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
618 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
619 }
620 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
621 self.swarm.sessions_mut().disconnect(peer_id, reason);
622 }
623 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
624 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
625 }
626 NetworkHandleMessage::SetNetworkState(net_state) => {
627 self.swarm.on_network_state_change(net_state);
632 }
633
634 NetworkHandleMessage::Shutdown(tx) => {
635 self.perform_network_shutdown();
636 let _ = tx.send(());
637 }
638 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
639 self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
640 }
641 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
642 let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
643 }
644 NetworkHandleMessage::FetchClient(tx) => {
645 let _ = tx.send(self.fetch_client());
646 }
647 NetworkHandleMessage::GetStatus(tx) => {
648 let _ = tx.send(self.status());
649 }
650 NetworkHandleMessage::StatusUpdate { head } => {
651 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
652 self.swarm.state_mut().update_fork_id(transition.current);
653 }
654 }
655 NetworkHandleMessage::GetPeerInfos(tx) => {
656 let _ = tx.send(self.get_peer_infos());
657 }
658 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
659 let _ = tx.send(self.get_peer_info_by_id(peer_id));
660 }
661 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
662 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
663 }
664 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
665 let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
666 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
667 }
668 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
669 NetworkHandleMessage::GetTransactionsHandle(tx) => {
670 if let Some(ref tx_inner) = self.to_transactions_manager {
671 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
672 } else {
673 let _ = tx.send(None);
674 }
675 }
676 NetworkHandleMessage::EthMessage { peer_id, message } => {
677 self.swarm.sessions_mut().send_message(&peer_id, message)
678 }
679 }
680 }
681
682 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
683 match event {
685 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
686 SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
687 self.on_invalid_message(peer_id, capabilities, message);
688 self.metrics.invalid_messages_received.increment(1);
689 }
690 SwarmEvent::TcpListenerClosed { remote_addr } => {
691 trace!(target: "net", ?remote_addr, "TCP listener closed.");
692 }
693 SwarmEvent::TcpListenerError(err) => {
694 trace!(target: "net", %err, "TCP connection error.");
695 }
696 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
697 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
698 self.metrics.total_incoming_connections.increment(1);
699 self.metrics
700 .incoming_connections
701 .set(self.swarm.state().peers().num_inbound_connections() as f64);
702 }
703 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
704 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
705 self.metrics.total_outgoing_connections.increment(1);
706 self.update_pending_connection_metrics()
707 }
708 SwarmEvent::SessionEstablished {
709 peer_id,
710 remote_addr,
711 client_version,
712 capabilities,
713 version,
714 messages,
715 status,
716 direction,
717 } => {
718 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
719 self.metrics.connected_peers.set(total_active as f64);
720 debug!(
721 target: "net",
722 ?remote_addr,
723 %client_version,
724 ?peer_id,
725 ?total_active,
726 kind=%direction,
727 peer_enode=%NodeRecord::new(remote_addr, peer_id),
728 "Session established"
729 );
730
731 if direction.is_incoming() {
732 self.swarm
733 .state_mut()
734 .peers_mut()
735 .on_incoming_session_established(peer_id, remote_addr);
736 }
737
738 if direction.is_outgoing() {
739 self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
740 }
741
742 self.update_active_connection_metrics();
743
744 let session_info = SessionInfo {
745 peer_id,
746 remote_addr,
747 client_version,
748 capabilities,
749 status,
750 version,
751 };
752
753 self.event_sender
754 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
755 }
756 SwarmEvent::PeerAdded(peer_id) => {
757 trace!(target: "net", ?peer_id, "Peer added");
758 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
759 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
760 }
761 SwarmEvent::PeerRemoved(peer_id) => {
762 trace!(target: "net", ?peer_id, "Peer dropped");
763 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
764 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
765 }
766 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
767 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
768 self.metrics.connected_peers.set(total_active as f64);
769 trace!(
770 target: "net",
771 ?remote_addr,
772 ?peer_id,
773 ?total_active,
774 ?error,
775 "Session disconnected"
776 );
777
778 let mut reason = None;
779 if let Some(ref err) = error {
780 self.swarm.state_mut().peers_mut().on_active_session_dropped(
783 &remote_addr,
784 &peer_id,
785 err,
786 );
787 reason = err.as_disconnected();
788 } else {
789 self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
791 }
792 self.metrics.closed_sessions.increment(1);
793 self.update_active_connection_metrics();
794
795 if let Some(reason) = reason {
796 self.disconnect_metrics.increment(reason);
797 }
798 self.metrics.backed_off_peers.set(
799 self.swarm
800 .state()
801 .peers()
802 .num_backed_off_peers()
803 .saturating_sub(1)
804 as f64,
805 );
806 self.event_sender
807 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
808 }
809 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
810 trace!(
811 target: "net",
812 ?remote_addr,
813 ?error,
814 "Incoming pending session failed"
815 );
816
817 if let Some(ref err) = error {
818 self.swarm
819 .state_mut()
820 .peers_mut()
821 .on_incoming_pending_session_dropped(remote_addr, err);
822 self.metrics.pending_session_failures.increment(1);
823 if let Some(reason) = err.as_disconnected() {
824 self.disconnect_metrics.increment(reason);
825 }
826 } else {
827 self.swarm
828 .state_mut()
829 .peers_mut()
830 .on_incoming_pending_session_gracefully_closed();
831 }
832 self.metrics.closed_sessions.increment(1);
833 self.metrics
834 .incoming_connections
835 .set(self.swarm.state().peers().num_inbound_connections() as f64);
836 self.metrics.backed_off_peers.set(
837 self.swarm
838 .state()
839 .peers()
840 .num_backed_off_peers()
841 .saturating_sub(1)
842 as f64,
843 );
844 }
845 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
846 trace!(
847 target: "net",
848 ?remote_addr,
849 ?peer_id,
850 ?error,
851 "Outgoing pending session failed"
852 );
853
854 if let Some(ref err) = error {
855 self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
856 &remote_addr,
857 &peer_id,
858 err,
859 );
860 self.metrics.pending_session_failures.increment(1);
861 if let Some(reason) = err.as_disconnected() {
862 self.disconnect_metrics.increment(reason);
863 }
864 } else {
865 self.swarm
866 .state_mut()
867 .peers_mut()
868 .on_outgoing_pending_session_gracefully_closed(&peer_id);
869 }
870 self.metrics.closed_sessions.increment(1);
871 self.update_pending_connection_metrics();
872
873 self.metrics.backed_off_peers.set(
874 self.swarm
875 .state()
876 .peers()
877 .num_backed_off_peers()
878 .saturating_sub(1)
879 as f64,
880 );
881 }
882 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
883 trace!(
884 target: "net",
885 ?remote_addr,
886 ?peer_id,
887 %error,
888 "Outgoing connection error"
889 );
890
891 self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
892 &remote_addr,
893 &peer_id,
894 &error,
895 );
896
897 self.metrics.backed_off_peers.set(
898 self.swarm
899 .state()
900 .peers()
901 .num_backed_off_peers()
902 .saturating_sub(1)
903 as f64,
904 );
905 self.update_pending_connection_metrics();
906 }
907 SwarmEvent::BadMessage { peer_id } => {
908 self.swarm
909 .state_mut()
910 .peers_mut()
911 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
912 self.metrics.invalid_messages_received.increment(1);
913 }
914 SwarmEvent::ProtocolBreach { peer_id } => {
915 self.swarm
916 .state_mut()
917 .peers_mut()
918 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
919 }
920 }
921 }
922
923 fn get_peer_infos(&self) -> Vec<PeerInfo> {
925 self.swarm
926 .sessions()
927 .active_sessions()
928 .iter()
929 .filter_map(|(&peer_id, session)| {
930 self.swarm
931 .state()
932 .peers()
933 .peer_by_id(peer_id)
934 .map(|(record, kind)| session.peer_info(&record, kind))
935 })
936 .collect()
937 }
938
939 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
943 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
944 self.swarm
945 .state()
946 .peers()
947 .peer_by_id(peer_id)
948 .map(|(record, kind)| session.peer_info(&record, kind))
949 })
950 }
951
952 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
956 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
957 }
958
959 #[inline]
961 fn update_active_connection_metrics(&self) {
962 self.metrics
963 .incoming_connections
964 .set(self.swarm.state().peers().num_inbound_connections() as f64);
965 self.metrics
966 .outgoing_connections
967 .set(self.swarm.state().peers().num_outbound_connections() as f64);
968 }
969
970 #[inline]
972 fn update_pending_connection_metrics(&self) {
973 self.metrics
974 .pending_outgoing_connections
975 .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
976 self.metrics
977 .total_pending_connections
978 .set(self.swarm.sessions().num_pending_connections() as f64);
979 }
980
981 pub async fn run_until_graceful_shutdown<F, R>(
985 mut self,
986 shutdown: GracefulShutdown,
987 shutdown_hook: F,
988 ) -> R
989 where
990 F: FnOnce(Self) -> R,
991 {
992 let mut graceful_guard = None;
993 tokio::select! {
994 _ = &mut self => {},
995 guard = shutdown => {
996 graceful_guard = Some(guard);
997 },
998 }
999
1000 self.perform_network_shutdown();
1001 let res = shutdown_hook(self);
1002 drop(graceful_guard);
1003 res
1004 }
1005
1006 fn perform_network_shutdown(&mut self) {
1009 self.swarm.on_shutdown_requested();
1013 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1015 self.swarm.sessions_mut().disconnect_all_pending();
1017 }
1018}
1019
1020impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1021 type Output = ();
1022
1023 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1024 let start = Instant::now();
1025 let mut poll_durations = NetworkManagerPollDurations::default();
1026
1027 let this = self.get_mut();
1028
1029 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1031 this.on_block_import_result(outcome);
1032 }
1033
1034 let start_network_handle = Instant::now();
1058 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1059 "net",
1060 "Network message channel",
1061 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1062 this.from_handle_rx.poll_next_unpin(cx),
1063 |msg| this.on_handle_message(msg),
1064 error!("Network channel closed");
1065 );
1066 poll_durations.acc_network_handle = start_network_handle.elapsed();
1067
1068 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1070 "net",
1071 "Swarm events stream",
1072 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1073 this.swarm.poll_next_unpin(cx),
1074 |event| this.on_swarm_event(event),
1075 );
1076 poll_durations.acc_swarm =
1077 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1078
1079 if maybe_more_handle_messages || maybe_more_swarm_events {
1081 cx.waker().wake_by_ref();
1083 return Poll::Pending
1084 }
1085
1086 this.update_poll_metrics(start, poll_durations);
1087
1088 Poll::Pending
1089 }
1090}
1091
1092#[derive(Debug, Default)]
1093struct NetworkManagerPollDurations {
1094 acc_network_handle: Duration,
1095 acc_swarm: Duration,
1096}