reth_network_api/test_utils/
peers_manager.rs1use std::net::SocketAddr;
5
6use derive_more::Constructor;
7use reth_network_peers::{NodeRecord, PeerId};
8use reth_network_types::{Peer, ReputationChangeKind};
9use tokio::sync::{mpsc, oneshot};
10
11#[auto_impl::auto_impl(&, Arc)]
13pub trait PeersHandleProvider {
14 fn peers_handle(&self) -> &PeersHandle;
18}
19
20#[derive(Clone, Debug, Constructor)]
22pub struct PeersHandle {
23 manager_tx: mpsc::UnboundedSender<PeerCommand>,
25}
26
27impl PeersHandle {
30 fn send(&self, cmd: PeerCommand) {
31 let _ = self.manager_tx.send(cmd);
32 }
33
34 pub fn add_peer(&self, peer_id: PeerId, addr: SocketAddr) {
36 self.send(PeerCommand::Add(peer_id, addr));
37 }
38
39 pub fn remove_peer(&self, peer_id: PeerId) {
41 self.send(PeerCommand::Remove(peer_id));
42 }
43
44 pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
46 self.send(PeerCommand::ReputationChange(peer_id, kind));
47 }
48
49 pub async fn peer_by_id(&self, peer_id: PeerId) -> Option<Peer> {
51 let (tx, rx) = oneshot::channel();
52 self.send(PeerCommand::GetPeer(peer_id, tx));
53
54 rx.await.unwrap_or(None)
55 }
56
57 pub async fn all_peers(&self) -> Vec<NodeRecord> {
59 let (tx, rx) = oneshot::channel();
60 self.send(PeerCommand::GetPeers(tx));
61
62 rx.await.unwrap_or_default()
63 }
64}
65
66#[derive(Debug)]
68pub enum PeerCommand {
69 Add(PeerId, SocketAddr),
71 Remove(PeerId),
75 ReputationChange(PeerId, ReputationChangeKind),
77 GetPeer(PeerId, oneshot::Sender<Option<Peer>>),
79 GetPeers(oneshot::Sender<Vec<NodeRecord>>),
81}