1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28 network::{NetworkHandle, NetworkHandleMessage},
29 peers::PeersManager,
30 poll_nested_stream_with_budget,
31 protocol::IntoRlpxSubProtocol,
32 session::SessionManager,
33 state::NetworkState,
34 swarm::{Swarm, SwarmEvent},
35 transactions::NetworkTransactionEvent,
36 FetchClient, NetworkBuilder,
37};
38use futures::{Future, StreamExt};
39use parking_lot::Mutex;
40use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
41use reth_fs_util::{self as fs, FsPathError};
42use reth_metrics::common::mpsc::UnboundedMeteredSender;
43use reth_network_api::{
44 events::{PeerEvent, SessionInfo},
45 test_utils::PeersHandle,
46 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
47};
48use reth_network_peers::{NodeRecord, PeerId};
49use reth_network_types::ReputationChangeKind;
50use reth_storage_api::BlockNumReader;
51use reth_tasks::shutdown::GracefulShutdown;
52use reth_tokio_util::EventSender;
53use secp256k1::SecretKey;
54use std::{
55 net::SocketAddr,
56 path::Path,
57 pin::Pin,
58 sync::{
59 atomic::{AtomicU64, AtomicUsize, Ordering},
60 Arc,
61 },
62 task::{Context, Poll},
63 time::{Duration, Instant},
64};
65use tokio::sync::mpsc::{self, error::TrySendError};
66use tokio_stream::wrappers::UnboundedReceiverStream;
67use tracing::{debug, error, trace, warn};
68
69#[cfg_attr(doc, aquamarine::aquamarine)]
70#[derive(Debug)]
102#[must_use = "The NetworkManager does nothing unless polled"]
103pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
104 swarm: Swarm<N>,
106 handle: NetworkHandle<N>,
108 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
110 block_import: Box<dyn BlockImport<N::Block>>,
112 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
114 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
117 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
131 num_active_peers: Arc<AtomicUsize>,
136 metrics: NetworkMetrics,
138 disconnect_metrics: DisconnectMetrics,
140}
141
142impl NetworkManager {
143 pub async fn eth<C: BlockNumReader + 'static>(
155 config: NetworkConfig<C, EthNetworkPrimitives>,
156 ) -> Result<Self, NetworkError> {
157 Self::new(config).await
158 }
159}
160
161impl<N: NetworkPrimitives> NetworkManager<N> {
162 pub fn with_transactions(
165 mut self,
166 tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
167 ) -> Self {
168 self.set_transactions(tx);
169 self
170 }
171
172 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
175 self.to_transactions_manager =
176 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
177 }
178
179 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
182 self.set_eth_request_handler(tx);
183 self
184 }
185
186 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
189 self.to_eth_request_handler = Some(tx);
190 }
191
192 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
194 self.swarm.add_rlpx_sub_protocol(protocol)
195 }
196
197 pub const fn handle(&self) -> &NetworkHandle<N> {
201 &self.handle
202 }
203
204 pub const fn secret_key(&self) -> SecretKey {
206 self.swarm.sessions().secret_key()
207 }
208
209 #[inline]
210 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
211 let metrics = &self.metrics;
212
213 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
214
215 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
217 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
219 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
220 }
221
222 pub async fn new<C: BlockNumReader + 'static>(
227 config: NetworkConfig<C, N>,
228 ) -> Result<Self, NetworkError> {
229 let NetworkConfig {
230 client,
231 secret_key,
232 discovery_v4_addr,
233 mut discovery_v4_config,
234 mut discovery_v5_config,
235 listener_addr,
236 peers_config,
237 sessions_config,
238 chain_id,
239 block_import,
240 network_mode,
241 boot_nodes,
242 executor,
243 hello_message,
244 status,
245 fork_filter,
246 dns_discovery_config,
247 extra_protocols,
248 tx_gossip_disabled,
249 transactions_manager_config: _,
250 nat,
251 handshake,
252 } = config;
253
254 let peers_manager = PeersManager::new(peers_config);
255 let peers_handle = peers_manager.handle();
256
257 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
258 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
259 })?;
260
261 let listener_addr = incoming.local_address();
263
264 let resolved_boot_nodes =
266 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
267
268 if let Some(disc_config) = discovery_v4_config.as_mut() {
269 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
271 disc_config.add_eip868_pair("eth", status.forkid);
272 }
273
274 if let Some(discv5) = discovery_v5_config.as_mut() {
275 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
277 }
278
279 let discovery = Discovery::new(
280 listener_addr,
281 discovery_v4_addr,
282 secret_key,
283 discovery_v4_config,
284 discovery_v5_config,
285 dns_discovery_config,
286 )
287 .await?;
288 let local_peer_id = discovery.local_id();
290 let discv4 = discovery.discv4();
291 let discv5 = discovery.discv5();
292
293 let num_active_peers = Arc::new(AtomicUsize::new(0));
294
295 let sessions = SessionManager::new(
296 secret_key,
297 sessions_config,
298 executor,
299 status,
300 hello_message,
301 fork_filter,
302 extra_protocols,
303 handshake,
304 );
305
306 let state = NetworkState::new(
307 crate::state::BlockNumReader::new(client),
308 discovery,
309 peers_manager,
310 Arc::clone(&num_active_peers),
311 );
312
313 let swarm = Swarm::new(incoming, sessions, state);
314
315 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
316
317 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
318
319 let handle = NetworkHandle::new(
320 Arc::clone(&num_active_peers),
321 Arc::new(Mutex::new(listener_addr)),
322 to_manager_tx,
323 secret_key,
324 local_peer_id,
325 peers_handle,
326 network_mode,
327 Arc::new(AtomicU64::new(chain_id)),
328 tx_gossip_disabled,
329 discv4,
330 discv5,
331 event_sender.clone(),
332 nat,
333 );
334
335 Ok(Self {
336 swarm,
337 handle,
338 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
339 block_import,
340 event_sender,
341 to_transactions_manager: None,
342 to_eth_request_handler: None,
343 num_active_peers,
344 metrics: Default::default(),
345 disconnect_metrics: Default::default(),
346 })
347 }
348
349 pub async fn builder<C: BlockNumReader + 'static>(
381 config: NetworkConfig<C, N>,
382 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
383 let network = Self::new(config).await?;
384 Ok(network.into_builder())
385 }
386
387 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
389 NetworkBuilder { network: self, transactions: (), request_handler: () }
390 }
391
392 pub const fn local_addr(&self) -> SocketAddr {
394 self.swarm.listener().local_address()
395 }
396
397 pub fn num_connected_peers(&self) -> usize {
399 self.swarm.state().num_active_peers()
400 }
401
402 pub fn peer_id(&self) -> &PeerId {
404 self.handle.peer_id()
405 }
406
407 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
409 self.swarm.state().peers().iter_peers()
410 }
411
412 pub fn num_known_peers(&self) -> usize {
414 self.swarm.state().peers().num_known_peers()
415 }
416
417 pub fn peers_handle(&self) -> PeersHandle {
421 self.swarm.state().peers().handle()
422 }
423
424 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
427 let known_peers = self.all_peers().collect::<Vec<_>>();
428 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
429 reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
430 Ok(())
431 }
432
433 pub fn fetch_client(&self) -> FetchClient<N> {
437 self.swarm.state().fetch_client()
438 }
439
440 pub fn status(&self) -> NetworkStatus {
442 let sessions = self.swarm.sessions();
443 let status = sessions.status();
444 let hello_message = sessions.hello_message();
445
446 #[expect(deprecated)]
447 NetworkStatus {
448 client_version: hello_message.client_version,
449 protocol_version: hello_message.protocol_version as u64,
450 eth_protocol_info: EthProtocolInfo {
451 difficulty: None,
452 head: status.blockhash,
453 network: status.chain.id(),
454 genesis: status.genesis,
455 config: Default::default(),
456 },
457 }
458 }
459
460 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
463 if let Some(ref tx) = self.to_transactions_manager {
464 let _ = tx.send(event);
465 }
466 }
467
468 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
471 if let Some(ref reqs) = self.to_eth_request_handler {
472 let _ = reqs.try_send(event).map_err(|e| {
473 if let TrySendError::Full(_) = e {
474 debug!(target:"net", "EthRequestHandler channel is full!");
475 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
476 }
477 });
478 }
479 }
480
481 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
483 match req {
484 PeerRequest::GetBlockHeaders { request, response } => {
485 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
486 peer_id,
487 request,
488 response,
489 })
490 }
491 PeerRequest::GetBlockBodies { request, response } => {
492 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
493 peer_id,
494 request,
495 response,
496 })
497 }
498 PeerRequest::GetNodeData { request, response } => {
499 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
500 peer_id,
501 request,
502 response,
503 })
504 }
505 PeerRequest::GetReceipts { request, response } => {
506 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
507 peer_id,
508 request,
509 response,
510 })
511 }
512 PeerRequest::GetPooledTransactions { request, response } => {
513 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
514 peer_id,
515 request,
516 response,
517 });
518 }
519 }
520 }
521
522 fn on_block_import_result(&mut self, event: BlockImportEvent<N::Block>) {
524 match event {
525 BlockImportEvent::Announcement(validation) => match validation {
526 BlockValidation::ValidHeader { block } => {
527 self.swarm.state_mut().announce_new_block(block);
528 }
529 BlockValidation::ValidBlock { block } => {
530 self.swarm.state_mut().announce_new_block_hash(block);
531 }
532 },
533 BlockImportEvent::Outcome(outcome) => {
534 let BlockImportOutcome { peer, result } = outcome;
535 match result {
536 Ok(validated_block) => match validated_block {
537 BlockValidation::ValidHeader { block } => {
538 self.swarm.state_mut().update_peer_block(
539 &peer,
540 block.hash,
541 block.number(),
542 );
543 self.swarm.state_mut().announce_new_block(block);
544 }
545 BlockValidation::ValidBlock { block } => {
546 self.swarm.state_mut().announce_new_block_hash(block);
547 }
548 },
549 Err(_err) => {
550 self.swarm
551 .state_mut()
552 .peers_mut()
553 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
554 }
555 }
556 }
557 }
558 }
559
560 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
566 where
567 F: FnOnce(&mut Self),
568 {
569 if self.handle.mode().is_stake() {
571 self.swarm
573 .sessions_mut()
574 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
575 } else {
576 only_pow(self);
577 }
578 }
579
580 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
582 match msg {
583 PeerMessage::NewBlockHashes(hashes) => {
584 self.within_pow_or_disconnect(peer_id, |this| {
585 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
587 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
589 })
590 }
591 PeerMessage::NewBlock(block) => {
592 self.within_pow_or_disconnect(peer_id, move |this| {
593 this.swarm.state_mut().on_new_block(peer_id, block.hash);
594 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
596 });
597 }
598 PeerMessage::PooledTransactions(msg) => {
599 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
600 peer_id,
601 msg,
602 });
603 }
604 PeerMessage::EthRequest(req) => {
605 self.on_eth_request(peer_id, req);
606 }
607 PeerMessage::ReceivedTransaction(msg) => {
608 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
609 peer_id,
610 msg,
611 });
612 }
613 PeerMessage::SendTransactions(_) => {
614 unreachable!("Not emitted by session")
615 }
616 PeerMessage::BlockRangeUpdated(_) => {}
617 PeerMessage::Other(other) => {
618 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
619 }
620 }
621 }
622
623 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
625 match msg {
626 NetworkHandleMessage::DiscoveryListener(tx) => {
627 self.swarm.state_mut().discovery_mut().add_listener(tx);
628 }
629 NetworkHandleMessage::AnnounceBlock(block, hash) => {
630 if self.handle.mode().is_stake() {
631 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
633 return
634 }
635 let msg = NewBlockMessage { hash, block: Arc::new(block) };
636 self.swarm.state_mut().announce_new_block(msg);
637 }
638 NetworkHandleMessage::EthRequest { peer_id, request } => {
639 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
640 }
641 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
642 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
643 }
644 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
645 .swarm
646 .sessions_mut()
647 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
648 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
649 self.swarm.state_mut().add_trusted_peer_id(peer_id);
650 }
651 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
652 if !self.swarm.is_shutting_down() {
654 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
655 }
656 }
657 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
658 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
659 }
660 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
661 self.swarm.sessions_mut().disconnect(peer_id, reason);
662 }
663 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
664 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
665 }
666 NetworkHandleMessage::SetNetworkState(net_state) => {
667 self.swarm.on_network_state_change(net_state);
672 }
673
674 NetworkHandleMessage::Shutdown(tx) => {
675 self.perform_network_shutdown();
676 let _ = tx.send(());
677 }
678 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
679 self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
680 }
681 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
682 let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
683 }
684 NetworkHandleMessage::FetchClient(tx) => {
685 let _ = tx.send(self.fetch_client());
686 }
687 NetworkHandleMessage::GetStatus(tx) => {
688 let _ = tx.send(self.status());
689 }
690 NetworkHandleMessage::StatusUpdate { head } => {
691 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
692 self.swarm.state_mut().update_fork_id(transition.current);
693 }
694 }
695 NetworkHandleMessage::GetPeerInfos(tx) => {
696 let _ = tx.send(self.get_peer_infos());
697 }
698 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
699 let _ = tx.send(self.get_peer_info_by_id(peer_id));
700 }
701 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
702 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
703 }
704 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
705 let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
706 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
707 }
708 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
709 NetworkHandleMessage::GetTransactionsHandle(tx) => {
710 if let Some(ref tx_inner) = self.to_transactions_manager {
711 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
712 } else {
713 let _ = tx.send(None);
714 }
715 }
716 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
717 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
718 }
719 NetworkHandleMessage::EthMessage { peer_id, message } => {
720 self.swarm.sessions_mut().send_message(&peer_id, message)
721 }
722 }
723 }
724
725 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
726 match event {
728 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
729 SwarmEvent::TcpListenerClosed { remote_addr } => {
730 trace!(target: "net", ?remote_addr, "TCP listener closed.");
731 }
732 SwarmEvent::TcpListenerError(err) => {
733 trace!(target: "net", %err, "TCP connection error.");
734 }
735 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
736 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
737 self.metrics.total_incoming_connections.increment(1);
738 self.metrics
739 .incoming_connections
740 .set(self.swarm.state().peers().num_inbound_connections() as f64);
741 }
742 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
743 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
744 self.metrics.total_outgoing_connections.increment(1);
745 self.update_pending_connection_metrics()
746 }
747 SwarmEvent::SessionEstablished {
748 peer_id,
749 remote_addr,
750 client_version,
751 capabilities,
752 version,
753 messages,
754 status,
755 direction,
756 } => {
757 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
758 self.metrics.connected_peers.set(total_active as f64);
759 debug!(
760 target: "net",
761 ?remote_addr,
762 %client_version,
763 ?peer_id,
764 ?total_active,
765 kind=%direction,
766 peer_enode=%NodeRecord::new(remote_addr, peer_id),
767 "Session established"
768 );
769
770 if direction.is_incoming() {
771 self.swarm
772 .state_mut()
773 .peers_mut()
774 .on_incoming_session_established(peer_id, remote_addr);
775 }
776
777 if direction.is_outgoing() {
778 self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
779 }
780
781 self.update_active_connection_metrics();
782
783 let peer_kind = self
784 .swarm
785 .state()
786 .peers()
787 .peer_by_id(peer_id)
788 .map(|(_, kind)| kind)
789 .unwrap_or_default();
790 let session_info = SessionInfo {
791 peer_id,
792 remote_addr,
793 client_version,
794 capabilities,
795 status,
796 version,
797 peer_kind,
798 };
799
800 self.event_sender
801 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
802 }
803 SwarmEvent::PeerAdded(peer_id) => {
804 trace!(target: "net", ?peer_id, "Peer added");
805 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
806 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
807 }
808 SwarmEvent::PeerRemoved(peer_id) => {
809 trace!(target: "net", ?peer_id, "Peer dropped");
810 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
811 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
812 }
813 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
814 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
815 self.metrics.connected_peers.set(total_active as f64);
816 trace!(
817 target: "net",
818 ?remote_addr,
819 ?peer_id,
820 ?total_active,
821 ?error,
822 "Session disconnected"
823 );
824
825 let mut reason = None;
826 if let Some(ref err) = error {
827 self.swarm.state_mut().peers_mut().on_active_session_dropped(
830 &remote_addr,
831 &peer_id,
832 err,
833 );
834 reason = err.as_disconnected();
835 } else {
836 self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
838 }
839 self.metrics.closed_sessions.increment(1);
840 self.update_active_connection_metrics();
841
842 if let Some(reason) = reason {
843 self.disconnect_metrics.increment(reason);
844 }
845 self.metrics.backed_off_peers.set(
846 self.swarm
847 .state()
848 .peers()
849 .num_backed_off_peers()
850 .saturating_sub(1)
851 as f64,
852 );
853 self.event_sender
854 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
855 }
856 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
857 trace!(
858 target: "net",
859 ?remote_addr,
860 ?error,
861 "Incoming pending session failed"
862 );
863
864 if let Some(ref err) = error {
865 self.swarm
866 .state_mut()
867 .peers_mut()
868 .on_incoming_pending_session_dropped(remote_addr, err);
869 self.metrics.pending_session_failures.increment(1);
870 if let Some(reason) = err.as_disconnected() {
871 self.disconnect_metrics.increment(reason);
872 }
873 } else {
874 self.swarm
875 .state_mut()
876 .peers_mut()
877 .on_incoming_pending_session_gracefully_closed();
878 }
879 self.metrics.closed_sessions.increment(1);
880 self.metrics
881 .incoming_connections
882 .set(self.swarm.state().peers().num_inbound_connections() as f64);
883 self.metrics.backed_off_peers.set(
884 self.swarm
885 .state()
886 .peers()
887 .num_backed_off_peers()
888 .saturating_sub(1)
889 as f64,
890 );
891 }
892 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
893 trace!(
894 target: "net",
895 ?remote_addr,
896 ?peer_id,
897 ?error,
898 "Outgoing pending session failed"
899 );
900
901 if let Some(ref err) = error {
902 self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
903 &remote_addr,
904 &peer_id,
905 err,
906 );
907 self.metrics.pending_session_failures.increment(1);
908 if let Some(reason) = err.as_disconnected() {
909 self.disconnect_metrics.increment(reason);
910 }
911 } else {
912 self.swarm
913 .state_mut()
914 .peers_mut()
915 .on_outgoing_pending_session_gracefully_closed(&peer_id);
916 }
917 self.metrics.closed_sessions.increment(1);
918 self.update_pending_connection_metrics();
919
920 self.metrics.backed_off_peers.set(
921 self.swarm
922 .state()
923 .peers()
924 .num_backed_off_peers()
925 .saturating_sub(1)
926 as f64,
927 );
928 }
929 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
930 trace!(
931 target: "net",
932 ?remote_addr,
933 ?peer_id,
934 %error,
935 "Outgoing connection error"
936 );
937
938 self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
939 &remote_addr,
940 &peer_id,
941 &error,
942 );
943
944 self.metrics.backed_off_peers.set(
945 self.swarm
946 .state()
947 .peers()
948 .num_backed_off_peers()
949 .saturating_sub(1)
950 as f64,
951 );
952 self.update_pending_connection_metrics();
953 }
954 SwarmEvent::BadMessage { peer_id } => {
955 self.swarm
956 .state_mut()
957 .peers_mut()
958 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
959 self.metrics.invalid_messages_received.increment(1);
960 }
961 SwarmEvent::ProtocolBreach { peer_id } => {
962 self.swarm
963 .state_mut()
964 .peers_mut()
965 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
966 }
967 }
968 }
969
970 fn get_peer_infos(&self) -> Vec<PeerInfo> {
972 self.swarm
973 .sessions()
974 .active_sessions()
975 .iter()
976 .filter_map(|(&peer_id, session)| {
977 self.swarm
978 .state()
979 .peers()
980 .peer_by_id(peer_id)
981 .map(|(record, kind)| session.peer_info(&record, kind))
982 })
983 .collect()
984 }
985
986 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
990 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
991 self.swarm
992 .state()
993 .peers()
994 .peer_by_id(peer_id)
995 .map(|(record, kind)| session.peer_info(&record, kind))
996 })
997 }
998
999 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1003 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1004 }
1005
1006 #[inline]
1008 fn update_active_connection_metrics(&self) {
1009 self.metrics
1010 .incoming_connections
1011 .set(self.swarm.state().peers().num_inbound_connections() as f64);
1012 self.metrics
1013 .outgoing_connections
1014 .set(self.swarm.state().peers().num_outbound_connections() as f64);
1015 }
1016
1017 #[inline]
1019 fn update_pending_connection_metrics(&self) {
1020 self.metrics
1021 .pending_outgoing_connections
1022 .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1023 self.metrics
1024 .total_pending_connections
1025 .set(self.swarm.sessions().num_pending_connections() as f64);
1026 }
1027
1028 pub async fn run_until_graceful_shutdown<F, R>(
1032 mut self,
1033 shutdown: GracefulShutdown,
1034 shutdown_hook: F,
1035 ) -> R
1036 where
1037 F: FnOnce(Self) -> R,
1038 {
1039 let mut graceful_guard = None;
1040 tokio::select! {
1041 _ = &mut self => {},
1042 guard = shutdown => {
1043 graceful_guard = Some(guard);
1044 },
1045 }
1046
1047 self.perform_network_shutdown();
1048 let res = shutdown_hook(self);
1049 drop(graceful_guard);
1050 res
1051 }
1052
1053 fn perform_network_shutdown(&mut self) {
1056 self.swarm.on_shutdown_requested();
1060 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1062 self.swarm.sessions_mut().disconnect_all_pending();
1064 }
1065}
1066
1067impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1068 type Output = ();
1069
1070 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1071 let start = Instant::now();
1072 let mut poll_durations = NetworkManagerPollDurations::default();
1073
1074 let this = self.get_mut();
1075
1076 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1078 this.on_block_import_result(outcome);
1079 }
1080
1081 let start_network_handle = Instant::now();
1105 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1106 "net",
1107 "Network message channel",
1108 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1109 this.from_handle_rx.poll_next_unpin(cx),
1110 |msg| this.on_handle_message(msg),
1111 error!("Network channel closed");
1112 );
1113 poll_durations.acc_network_handle = start_network_handle.elapsed();
1114
1115 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1117 "net",
1118 "Swarm events stream",
1119 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1120 this.swarm.poll_next_unpin(cx),
1121 |event| this.on_swarm_event(event),
1122 );
1123 poll_durations.acc_swarm =
1124 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1125
1126 if maybe_more_handle_messages || maybe_more_swarm_events {
1128 cx.waker().wake_by_ref();
1130 return Poll::Pending
1131 }
1132
1133 this.update_poll_metrics(start, poll_durations);
1134
1135 Poll::Pending
1136 }
1137}
1138
1139#[derive(Debug, Default)]
1140struct NetworkManagerPollDurations {
1141 acc_network_handle: Duration,
1142 acc_swarm: Duration,
1143}