reth_network/test_utils/
testnet.rs

1//! A network implementation for testing purposes.
2
3use crate::{
4    builder::ETH_REQUEST_CHANNEL_CAPACITY,
5    error::NetworkError,
6    eth_requests::EthRequestHandler,
7    protocol::IntoRlpxSubProtocol,
8    transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
9    NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
10};
11use futures::{FutureExt, StreamExt};
12use pin_project::pin_project;
13use reth_chainspec::{ChainSpecProvider, Hardforks, MAINNET};
14use reth_eth_wire::{
15    protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
16};
17use reth_network_api::{
18    events::{PeerEvent, SessionInfo},
19    test_utils::{PeersHandle, PeersHandleProvider},
20    NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
21};
22use reth_network_peers::PeerId;
23use reth_primitives::{PooledTransactionsElement, TransactionSigned};
24use reth_storage_api::{
25    noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
26};
27use reth_tasks::TokioTaskExecutor;
28use reth_tokio_util::EventStream;
29use reth_transaction_pool::{
30    blobstore::InMemoryBlobStore,
31    test_utils::{TestPool, TestPoolBuilder},
32    EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
33};
34use secp256k1::SecretKey;
35use std::{
36    fmt,
37    future::Future,
38    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
39    pin::Pin,
40    task::{Context, Poll},
41};
42use tokio::{
43    sync::{
44        mpsc::{channel, unbounded_channel},
45        oneshot,
46    },
47    task::JoinHandle,
48};
49
50/// A test network consisting of multiple peers.
51pub struct Testnet<C, Pool> {
52    /// All running peers in the network.
53    peers: Vec<Peer<C, Pool>>,
54}
55
56// === impl Testnet ===
57
58impl<C> Testnet<C, TestPool>
59where
60    C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
61{
62    /// Same as [`Self::try_create_with`] but panics on error
63    pub async fn create_with(num_peers: usize, provider: C) -> Self {
64        Self::try_create_with(num_peers, provider).await.unwrap()
65    }
66
67    /// Creates a new [`Testnet`] with the given number of peers and the provider.
68    pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
69        let mut this = Self { peers: Vec::with_capacity(num_peers) };
70        for _ in 0..num_peers {
71            let config = PeerConfig::new(provider.clone());
72            this.add_peer_with_config(config).await?;
73        }
74        Ok(this)
75    }
76
77    /// Extend the list of peers with new peers that are configured with each of the given
78    /// [`PeerConfig`]s.
79    pub async fn extend_peer_with_config(
80        &mut self,
81        configs: impl IntoIterator<Item = PeerConfig<C>>,
82    ) -> Result<(), NetworkError> {
83        let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
84        let peers = futures::future::join_all(peers).await;
85        for peer in peers {
86            self.peers.push(peer?);
87        }
88        Ok(())
89    }
90}
91
92impl<C, Pool> Testnet<C, Pool>
93where
94    C: BlockReader + HeaderProvider + Clone + 'static,
95    Pool: TransactionPool,
96{
97    /// Return a mutable slice of all peers.
98    pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
99        &mut self.peers
100    }
101
102    /// Return a slice of all peers.
103    pub fn peers(&self) -> &[Peer<C, Pool>] {
104        &self.peers
105    }
106
107    /// Remove a peer from the [`Testnet`] and return it.
108    ///
109    /// # Panics
110    /// If the index is out of bounds.
111    pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
112        self.peers.remove(index)
113    }
114
115    /// Return a mutable iterator over all peers.
116    pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
117        self.peers.iter_mut()
118    }
119
120    /// Return an iterator over all peers.
121    pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
122        self.peers.iter()
123    }
124
125    /// Add a peer to the [`Testnet`] with the given [`PeerConfig`].
126    pub async fn add_peer_with_config(
127        &mut self,
128        config: PeerConfig<C>,
129    ) -> Result<(), NetworkError> {
130        let PeerConfig { config, client, secret_key } = config;
131
132        let network = NetworkManager::new(config).await?;
133        let peer = Peer {
134            network,
135            client,
136            secret_key,
137            request_handler: None,
138            transactions_manager: None,
139            pool: None,
140        };
141        self.peers.push(peer);
142        Ok(())
143    }
144
145    /// Returns all handles to the networks
146    pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
147        self.peers.iter().map(|p| p.handle())
148    }
149
150    /// Maps the pool of each peer with the given closure
151    pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
152    where
153        F: Fn(Peer<C, Pool>) -> Peer<C, P>,
154        P: TransactionPool,
155    {
156        Testnet { peers: self.peers.into_iter().map(f).collect() }
157    }
158
159    /// Apply a closure on each peer
160    pub fn for_each<F>(&self, f: F)
161    where
162        F: Fn(&Peer<C, Pool>),
163    {
164        self.peers.iter().for_each(f)
165    }
166
167    /// Apply a closure on each peer
168    pub fn for_each_mut<F>(&mut self, f: F)
169    where
170        F: FnMut(&mut Peer<C, Pool>),
171    {
172        self.peers.iter_mut().for_each(f)
173    }
174}
175
176impl<C, Pool> Testnet<C, Pool>
177where
178    C: StateProviderFactory + BlockReaderIdExt + HeaderProvider + Clone + 'static,
179    Pool: TransactionPool,
180{
181    /// Installs an eth pool on each peer
182    pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
183        self.map_pool(|peer| {
184            let blob_store = InMemoryBlobStore::default();
185            let pool = TransactionValidationTaskExecutor::eth(
186                peer.client.clone(),
187                MAINNET.clone(),
188                blob_store.clone(),
189                TokioTaskExecutor::default(),
190            );
191            peer.map_transactions_manager(EthTransactionPool::eth_pool(
192                pool,
193                blob_store,
194                Default::default(),
195            ))
196        })
197    }
198
199    /// Installs an eth pool on each peer with custom transaction manager config
200    pub fn with_eth_pool_config(
201        self,
202        tx_manager_config: TransactionsManagerConfig,
203    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
204        self.map_pool(|peer| {
205            let blob_store = InMemoryBlobStore::default();
206            let pool = TransactionValidationTaskExecutor::eth(
207                peer.client.clone(),
208                MAINNET.clone(),
209                blob_store.clone(),
210                TokioTaskExecutor::default(),
211            );
212
213            peer.map_transactions_manager_with_config(
214                EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
215                tx_manager_config.clone(),
216            )
217        })
218    }
219}
220
221impl<C, Pool> Testnet<C, Pool>
222where
223    C: BlockReader<
224            Block = reth_primitives::Block,
225            Receipt = reth_primitives::Receipt,
226            Header = reth_primitives::Header,
227        > + HeaderProvider
228        + Clone
229        + Unpin
230        + 'static,
231    Pool: TransactionPool<
232            Transaction: PoolTransaction<
233                Consensus = TransactionSigned,
234                Pooled = PooledTransactionsElement,
235            >,
236        > + Unpin
237        + 'static,
238{
239    /// Spawns the testnet to a separate task
240    pub fn spawn(self) -> TestnetHandle<C, Pool> {
241        let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
242        let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
243        let mut net = self;
244        let handle = tokio::task::spawn(async move {
245            let mut tx = None;
246            tokio::select! {
247                _ = &mut net => {}
248                inc = rx => {
249                    tx = inc.ok();
250                }
251            }
252            if let Some(tx) = tx {
253                let _ = tx.send(net);
254            }
255        });
256
257        TestnetHandle { _handle: handle, peers, terminate: tx }
258    }
259}
260
261impl Testnet<NoopProvider, TestPool> {
262    /// Same as [`Self::try_create`] but panics on error
263    pub async fn create(num_peers: usize) -> Self {
264        Self::try_create(num_peers).await.unwrap()
265    }
266
267    /// Creates a new [`Testnet`] with the given number of peers
268    pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
269        let mut this = Self::default();
270
271        this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
272        Ok(this)
273    }
274
275    /// Add a peer to the [`Testnet`]
276    pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
277        self.add_peer_with_config(Default::default()).await
278    }
279}
280
281impl<C, Pool> Default for Testnet<C, Pool> {
282    fn default() -> Self {
283        Self { peers: Vec::new() }
284    }
285}
286
287impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.debug_struct("Testnet {{}}").finish_non_exhaustive()
290    }
291}
292
293impl<C, Pool> Future for Testnet<C, Pool>
294where
295    C: BlockReader<
296            Block = reth_primitives::Block,
297            Receipt = reth_primitives::Receipt,
298            Header = reth_primitives::Header,
299        > + HeaderProvider
300        + Unpin
301        + 'static,
302    Pool: TransactionPool<
303            Transaction: PoolTransaction<
304                Consensus = TransactionSigned,
305                Pooled = PooledTransactionsElement,
306            >,
307        > + Unpin
308        + 'static,
309{
310    type Output = ();
311
312    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
313        let this = self.get_mut();
314        for peer in &mut this.peers {
315            let _ = peer.poll_unpin(cx);
316        }
317        Poll::Pending
318    }
319}
320
321/// A handle to a [`Testnet`] that can be shared.
322#[derive(Debug)]
323pub struct TestnetHandle<C, Pool> {
324    _handle: JoinHandle<()>,
325    peers: Vec<PeerHandle<Pool>>,
326    terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
327}
328
329// === impl TestnetHandle ===
330
331impl<C, Pool> TestnetHandle<C, Pool> {
332    /// Terminates the task and returns the [`Testnet`] back.
333    pub async fn terminate(self) -> Testnet<C, Pool> {
334        let (tx, rx) = oneshot::channel();
335        self.terminate.send(tx).unwrap();
336        rx.await.unwrap()
337    }
338
339    /// Returns the [`PeerHandle`]s of this [`Testnet`].
340    pub fn peers(&self) -> &[PeerHandle<Pool>] {
341        &self.peers
342    }
343
344    /// Connects all peers with each other.
345    ///
346    /// This establishes sessions concurrently between all peers.
347    ///
348    /// Returns once all sessions are established.
349    pub async fn connect_peers(&self) {
350        if self.peers.len() < 2 {
351            return
352        }
353
354        // add an event stream for _each_ peer
355        let streams =
356            self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
357
358        // add all peers to each other
359        for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
360            for idx in (idx + 1)..self.peers.len() {
361                let neighbour = &self.peers[idx];
362                handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
363            }
364        }
365
366        // await all sessions to be established
367        let num_sessions_per_peer = self.peers.len() - 1;
368        let fut = streams.into_iter().map(|mut stream| async move {
369            stream.take_session_established(num_sessions_per_peer).await
370        });
371
372        futures::future::join_all(fut).await;
373    }
374}
375
376/// A peer in the [`Testnet`].
377#[pin_project]
378#[derive(Debug)]
379pub struct Peer<C, Pool = TestPool> {
380    #[pin]
381    network: NetworkManager<EthNetworkPrimitives>,
382    #[pin]
383    request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
384    #[pin]
385    transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
386    pool: Option<Pool>,
387    client: C,
388    secret_key: SecretKey,
389}
390
391// === impl Peer ===
392
393impl<C, Pool> Peer<C, Pool>
394where
395    C: BlockReader + HeaderProvider + Clone + 'static,
396    Pool: TransactionPool,
397{
398    /// Returns the number of connected peers.
399    pub fn num_peers(&self) -> usize {
400        self.network.num_connected_peers()
401    }
402
403    /// Adds an additional protocol handler to the peer.
404    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
405        self.network.add_rlpx_sub_protocol(protocol);
406    }
407
408    /// Returns a handle to the peer's network.
409    pub fn peer_handle(&self) -> PeerHandle<Pool> {
410        PeerHandle {
411            network: self.network.handle().clone(),
412            pool: self.pool.clone(),
413            transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
414        }
415    }
416
417    /// The address that listens for incoming connections.
418    pub const fn local_addr(&self) -> SocketAddr {
419        self.network.local_addr()
420    }
421
422    /// The [`PeerId`] of this peer.
423    pub fn peer_id(&self) -> PeerId {
424        *self.network.peer_id()
425    }
426
427    /// Returns mutable access to the network.
428    pub fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
429        &mut self.network
430    }
431
432    /// Returns the [`NetworkHandle`] of this peer.
433    pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
434        self.network.handle().clone()
435    }
436
437    /// Returns the [`TestPool`] of this peer.
438    pub const fn pool(&self) -> Option<&Pool> {
439        self.pool.as_ref()
440    }
441
442    /// Set a new request handler that's connected to the peer's network
443    pub fn install_request_handler(&mut self) {
444        let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
445        self.network.set_eth_request_handler(tx);
446        let peers = self.network.peers_handle();
447        let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
448        self.request_handler = Some(request_handler);
449    }
450
451    /// Set a new transactions manager that's connected to the peer's network
452    pub fn install_transactions_manager(&mut self, pool: Pool) {
453        let (tx, rx) = unbounded_channel();
454        self.network.set_transactions(tx);
455        let transactions_manager = TransactionsManager::new(
456            self.handle(),
457            pool.clone(),
458            rx,
459            TransactionsManagerConfig::default(),
460        );
461        self.transactions_manager = Some(transactions_manager);
462        self.pool = Some(pool);
463    }
464
465    /// Set a new transactions manager that's connected to the peer's network
466    pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
467    where
468        P: TransactionPool,
469    {
470        let Self { mut network, request_handler, client, secret_key, .. } = self;
471        let (tx, rx) = unbounded_channel();
472        network.set_transactions(tx);
473        let transactions_manager = TransactionsManager::new(
474            network.handle().clone(),
475            pool.clone(),
476            rx,
477            TransactionsManagerConfig::default(),
478        );
479        Peer {
480            network,
481            request_handler,
482            transactions_manager: Some(transactions_manager),
483            pool: Some(pool),
484            client,
485            secret_key,
486        }
487    }
488
489    /// Map transactions manager with custom config
490    pub fn map_transactions_manager_with_config<P>(
491        self,
492        pool: P,
493        config: TransactionsManagerConfig,
494    ) -> Peer<C, P>
495    where
496        P: TransactionPool,
497    {
498        let Self { mut network, request_handler, client, secret_key, .. } = self;
499        let (tx, rx) = unbounded_channel();
500        network.set_transactions(tx);
501
502        let transactions_manager = TransactionsManager::new(
503            network.handle().clone(),
504            pool.clone(),
505            rx,
506            config, // Use provided config
507        );
508
509        Peer {
510            network,
511            request_handler,
512            transactions_manager: Some(transactions_manager),
513            pool: Some(pool),
514            client,
515            secret_key,
516        }
517    }
518}
519
520impl<C> Peer<C>
521where
522    C: BlockReader + HeaderProvider + Clone + 'static,
523{
524    /// Installs a new [`TestPool`]
525    pub fn install_test_pool(&mut self) {
526        self.install_transactions_manager(TestPoolBuilder::default().into())
527    }
528}
529
530impl<C, Pool> Future for Peer<C, Pool>
531where
532    C: BlockReader<
533            Block = reth_primitives::Block,
534            Receipt = reth_primitives::Receipt,
535            Header = reth_primitives::Header,
536        > + HeaderProvider
537        + Unpin
538        + 'static,
539    Pool: TransactionPool<
540            Transaction: PoolTransaction<
541                Consensus = TransactionSigned,
542                Pooled = PooledTransactionsElement,
543            >,
544        > + Unpin
545        + 'static,
546{
547    type Output = ();
548
549    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
550        let this = self.project();
551
552        if let Some(request) = this.request_handler.as_pin_mut() {
553            let _ = request.poll(cx);
554        }
555
556        if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
557            let _ = tx_manager.poll(cx);
558        }
559
560        this.network.poll(cx)
561    }
562}
563
564/// A helper config for setting up the reth networking stack.
565#[derive(Debug)]
566pub struct PeerConfig<C = NoopProvider> {
567    config: NetworkConfig<C>,
568    client: C,
569    secret_key: SecretKey,
570}
571
572/// A handle to a peer in the [`Testnet`].
573#[derive(Debug)]
574pub struct PeerHandle<Pool> {
575    network: NetworkHandle<EthNetworkPrimitives>,
576    transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
577    pool: Option<Pool>,
578}
579
580// === impl PeerHandle ===
581
582impl<Pool> PeerHandle<Pool> {
583    /// Returns the [`PeerId`] used in the network.
584    pub fn peer_id(&self) -> &PeerId {
585        self.network.peer_id()
586    }
587
588    /// Returns the [`PeersHandle`] from the network.
589    pub fn peer_handle(&self) -> &PeersHandle {
590        self.network.peers_handle()
591    }
592
593    /// Returns the local socket as configured for the network.
594    pub fn local_addr(&self) -> SocketAddr {
595        self.network.local_addr()
596    }
597
598    /// Creates a new [`NetworkEvent`] listener channel.
599    pub fn event_listener(&self) -> EventStream<NetworkEvent> {
600        self.network.event_listener()
601    }
602
603    /// Returns the [`TransactionsHandle`] of this peer.
604    pub const fn transactions(&self) -> Option<&TransactionsHandle> {
605        self.transactions.as_ref()
606    }
607
608    /// Returns the [`TestPool`] of this peer.
609    pub const fn pool(&self) -> Option<&Pool> {
610        self.pool.as_ref()
611    }
612
613    /// Returns the [`NetworkHandle`] of this peer.
614    pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
615        &self.network
616    }
617}
618
619// === impl PeerConfig ===
620
621impl<C> PeerConfig<C>
622where
623    C: BlockReader + HeaderProvider + Clone + 'static,
624{
625    /// Launches the network and returns the [Peer] that manages it
626    pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
627        let Self { config, client, secret_key } = self;
628        let network = NetworkManager::new(config).await?;
629        let peer = Peer {
630            network,
631            client,
632            secret_key,
633            request_handler: None,
634            transactions_manager: None,
635            pool: None,
636        };
637        Ok(peer)
638    }
639
640    /// Initialize the network with a random secret key, allowing the devp2p and discovery to bind
641    /// to any available IP and port.
642    pub fn new(client: C) -> Self
643    where
644        C: ChainSpecProvider<ChainSpec: Hardforks>,
645    {
646        let secret_key = SecretKey::new(&mut rand::thread_rng());
647        let config = Self::network_config_builder(secret_key).build(client.clone());
648        Self { config, client, secret_key }
649    }
650
651    /// Initialize the network with a given secret key, allowing devp2p and discovery to bind any
652    /// available IP and port.
653    pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
654    where
655        C: ChainSpecProvider<ChainSpec: Hardforks>,
656    {
657        let config = Self::network_config_builder(secret_key).build(client.clone());
658        Self { config, client, secret_key }
659    }
660
661    /// Initialize the network with a given capabilities.
662    pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
663    where
664        C: ChainSpecProvider<ChainSpec: Hardforks>,
665    {
666        let secret_key = SecretKey::new(&mut rand::thread_rng());
667
668        let builder = Self::network_config_builder(secret_key);
669        let hello_message =
670            HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
671        let config = builder.hello_message(hello_message).build(client.clone());
672
673        Self { config, client, secret_key }
674    }
675
676    fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
677        NetworkConfigBuilder::new(secret_key)
678            .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
679            .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
680            .disable_dns_discovery()
681            .disable_discv4_discovery()
682    }
683}
684
685impl Default for PeerConfig {
686    fn default() -> Self {
687        Self::new(NoopProvider::default())
688    }
689}
690
691/// A helper type to await network events
692///
693/// This makes it easier to await established connections
694#[derive(Debug)]
695pub struct NetworkEventStream {
696    inner: EventStream<NetworkEvent>,
697}
698
699// === impl NetworkEventStream ===
700
701impl NetworkEventStream {
702    /// Create a new [`NetworkEventStream`] from the given network event receiver stream.
703    pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
704        Self { inner }
705    }
706
707    /// Awaits the next event for a session to be closed
708    pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
709        while let Some(ev) = self.inner.next().await {
710            match ev {
711                NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => {
712                    return Some((peer_id, reason))
713                }
714                _ => continue,
715            }
716        }
717        None
718    }
719
720    /// Awaits the next event for an established session
721    pub async fn next_session_established(&mut self) -> Option<PeerId> {
722        while let Some(ev) = self.inner.next().await {
723            match ev {
724                NetworkEvent::ActivePeerSession { info, .. } |
725                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
726                    return Some(info.peer_id)
727                }
728                _ => continue,
729            }
730        }
731        None
732    }
733
734    /// Awaits the next `num` events for an established session
735    pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
736        if num == 0 {
737            return Vec::new();
738        }
739        let mut peers = Vec::with_capacity(num);
740        while let Some(ev) = self.inner.next().await {
741            match ev {
742                NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } => {
743                    peers.push(peer_id);
744                    num -= 1;
745                    if num == 0 {
746                        return peers;
747                    }
748                }
749                _ => continue,
750            }
751        }
752        peers
753    }
754
755    /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and
756    /// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established
757    /// session.
758    pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
759        let peer_id = match self.inner.next().await {
760            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
761            _ => return None,
762        };
763
764        match self.inner.next().await {
765            Some(NetworkEvent::ActivePeerSession {
766                info: SessionInfo { peer_id: peer_id2, .. },
767                ..
768            }) => {
769                debug_assert_eq!(
770                    peer_id, peer_id2,
771                    "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
772                );
773                Some(peer_id)
774            }
775            _ => None,
776        }
777    }
778}