reth_network/test_utils/
testnet.rs

1//! A network implementation for testing purposes.
2
3use crate::{
4    builder::ETH_REQUEST_CHANNEL_CAPACITY,
5    error::NetworkError,
6    eth_requests::EthRequestHandler,
7    protocol::IntoRlpxSubProtocol,
8    transactions::{
9        config::TransactionPropagationKind, TransactionsHandle, TransactionsManager,
10        TransactionsManagerConfig,
11    },
12    NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
13};
14use futures::{FutureExt, StreamExt};
15use pin_project::pin_project;
16use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
17use reth_eth_wire::{
18    protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
19};
20use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
21use reth_network_api::{
22    events::{PeerEvent, SessionInfo},
23    test_utils::{PeersHandle, PeersHandleProvider},
24    NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
25};
26use reth_network_peers::PeerId;
27use reth_storage_api::{
28    noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
29};
30use reth_tasks::TokioTaskExecutor;
31use reth_tokio_util::EventStream;
32use reth_transaction_pool::{
33    blobstore::InMemoryBlobStore,
34    test_utils::{TestPool, TestPoolBuilder},
35    EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
36};
37use secp256k1::SecretKey;
38use std::{
39    fmt,
40    future::Future,
41    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
42    pin::Pin,
43    task::{Context, Poll},
44};
45use tokio::{
46    sync::{
47        mpsc::{channel, unbounded_channel},
48        oneshot,
49    },
50    task::JoinHandle,
51};
52
53/// A test network consisting of multiple peers.
54pub struct Testnet<C, Pool> {
55    /// All running peers in the network.
56    peers: Vec<Peer<C, Pool>>,
57}
58
59// === impl Testnet ===
60
61impl<C> Testnet<C, TestPool>
62where
63    C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
64{
65    /// Same as [`Self::try_create_with`] but panics on error
66    pub async fn create_with(num_peers: usize, provider: C) -> Self {
67        Self::try_create_with(num_peers, provider).await.unwrap()
68    }
69
70    /// Creates a new [`Testnet`] with the given number of peers and the provider.
71    pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
72        let mut this = Self { peers: Vec::with_capacity(num_peers) };
73        for _ in 0..num_peers {
74            let config = PeerConfig::new(provider.clone());
75            this.add_peer_with_config(config).await?;
76        }
77        Ok(this)
78    }
79
80    /// Extend the list of peers with new peers that are configured with each of the given
81    /// [`PeerConfig`]s.
82    pub async fn extend_peer_with_config(
83        &mut self,
84        configs: impl IntoIterator<Item = PeerConfig<C>>,
85    ) -> Result<(), NetworkError> {
86        let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
87        let peers = futures::future::join_all(peers).await;
88        for peer in peers {
89            self.peers.push(peer?);
90        }
91        Ok(())
92    }
93}
94
95impl<C, Pool> Testnet<C, Pool>
96where
97    C: BlockReader + HeaderProvider + Clone + 'static,
98    Pool: TransactionPool,
99{
100    /// Return a mutable slice of all peers.
101    pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
102        &mut self.peers
103    }
104
105    /// Return a slice of all peers.
106    pub fn peers(&self) -> &[Peer<C, Pool>] {
107        &self.peers
108    }
109
110    /// Remove a peer from the [`Testnet`] and return it.
111    ///
112    /// # Panics
113    /// If the index is out of bounds.
114    pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
115        self.peers.remove(index)
116    }
117
118    /// Return a mutable iterator over all peers.
119    pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
120        self.peers.iter_mut()
121    }
122
123    /// Return an iterator over all peers.
124    pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
125        self.peers.iter()
126    }
127
128    /// Add a peer to the [`Testnet`] with the given [`PeerConfig`].
129    pub async fn add_peer_with_config(
130        &mut self,
131        config: PeerConfig<C>,
132    ) -> Result<(), NetworkError> {
133        let PeerConfig { config, client, secret_key } = config;
134
135        let network = NetworkManager::new(config).await?;
136        let peer = Peer {
137            network,
138            client,
139            secret_key,
140            request_handler: None,
141            transactions_manager: None,
142            pool: None,
143        };
144        self.peers.push(peer);
145        Ok(())
146    }
147
148    /// Returns all handles to the networks
149    pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
150        self.peers.iter().map(|p| p.handle())
151    }
152
153    /// Maps the pool of each peer with the given closure
154    pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
155    where
156        F: Fn(Peer<C, Pool>) -> Peer<C, P>,
157        P: TransactionPool,
158    {
159        Testnet { peers: self.peers.into_iter().map(f).collect() }
160    }
161
162    /// Apply a closure on each peer
163    pub fn for_each<F>(&self, f: F)
164    where
165        F: Fn(&Peer<C, Pool>),
166    {
167        self.peers.iter().for_each(f)
168    }
169
170    /// Apply a closure on each peer
171    pub fn for_each_mut<F>(&mut self, f: F)
172    where
173        F: FnMut(&mut Peer<C, Pool>),
174    {
175        self.peers.iter_mut().for_each(f)
176    }
177}
178
179impl<C, Pool> Testnet<C, Pool>
180where
181    C: ChainSpecProvider<ChainSpec: EthereumHardforks>
182        + StateProviderFactory
183        + BlockReaderIdExt
184        + HeaderProvider
185        + Clone
186        + 'static,
187    Pool: TransactionPool,
188{
189    /// Installs an eth pool on each peer
190    pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
191        self.map_pool(|peer| {
192            let blob_store = InMemoryBlobStore::default();
193            let pool = TransactionValidationTaskExecutor::eth(
194                peer.client.clone(),
195                blob_store.clone(),
196                TokioTaskExecutor::default(),
197            );
198            peer.map_transactions_manager(EthTransactionPool::eth_pool(
199                pool,
200                blob_store,
201                Default::default(),
202            ))
203        })
204    }
205
206    /// Installs an eth pool on each peer with custom transaction manager config
207    pub fn with_eth_pool_config(
208        self,
209        tx_manager_config: TransactionsManagerConfig,
210    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
211        self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
212    }
213
214    /// Installs an eth pool on each peer with custom transaction manager config and policy.
215    pub fn with_eth_pool_config_and_policy(
216        self,
217        tx_manager_config: TransactionsManagerConfig,
218        policy: TransactionPropagationKind,
219    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
220        self.map_pool(|peer| {
221            let blob_store = InMemoryBlobStore::default();
222            let pool = TransactionValidationTaskExecutor::eth(
223                peer.client.clone(),
224                blob_store.clone(),
225                TokioTaskExecutor::default(),
226            );
227
228            peer.map_transactions_manager_with(
229                EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
230                tx_manager_config.clone(),
231                policy,
232            )
233        })
234    }
235}
236
237impl<C, Pool> Testnet<C, Pool>
238where
239    C: BlockReader<
240            Block = reth_ethereum_primitives::Block,
241            Receipt = reth_ethereum_primitives::Receipt,
242            Header = alloy_consensus::Header,
243        > + HeaderProvider
244        + Clone
245        + Unpin
246        + 'static,
247    Pool: TransactionPool<
248            Transaction: PoolTransaction<
249                Consensus = TransactionSigned,
250                Pooled = PooledTransactionVariant,
251            >,
252        > + Unpin
253        + 'static,
254{
255    /// Spawns the testnet to a separate task
256    pub fn spawn(self) -> TestnetHandle<C, Pool> {
257        let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
258        let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
259        let mut net = self;
260        let handle = tokio::task::spawn(async move {
261            let mut tx = None;
262            tokio::select! {
263                _ = &mut net => {}
264                inc = rx => {
265                    tx = inc.ok();
266                }
267            }
268            if let Some(tx) = tx {
269                let _ = tx.send(net);
270            }
271        });
272
273        TestnetHandle { _handle: handle, peers, terminate: tx }
274    }
275}
276
277impl Testnet<NoopProvider, TestPool> {
278    /// Same as [`Self::try_create`] but panics on error
279    pub async fn create(num_peers: usize) -> Self {
280        Self::try_create(num_peers).await.unwrap()
281    }
282
283    /// Creates a new [`Testnet`] with the given number of peers
284    pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
285        let mut this = Self::default();
286
287        this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
288        Ok(this)
289    }
290
291    /// Add a peer to the [`Testnet`]
292    pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
293        self.add_peer_with_config(Default::default()).await
294    }
295}
296
297impl<C, Pool> Default for Testnet<C, Pool> {
298    fn default() -> Self {
299        Self { peers: Vec::new() }
300    }
301}
302
303impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
304    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305        f.debug_struct("Testnet {{}}").finish_non_exhaustive()
306    }
307}
308
309impl<C, Pool> Future for Testnet<C, Pool>
310where
311    C: BlockReader<
312            Block = reth_ethereum_primitives::Block,
313            Receipt = reth_ethereum_primitives::Receipt,
314            Header = alloy_consensus::Header,
315        > + HeaderProvider
316        + Unpin
317        + 'static,
318    Pool: TransactionPool<
319            Transaction: PoolTransaction<
320                Consensus = TransactionSigned,
321                Pooled = PooledTransactionVariant,
322            >,
323        > + Unpin
324        + 'static,
325{
326    type Output = ();
327
328    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329        let this = self.get_mut();
330        for peer in &mut this.peers {
331            let _ = peer.poll_unpin(cx);
332        }
333        Poll::Pending
334    }
335}
336
337/// A handle to a [`Testnet`] that can be shared.
338#[derive(Debug)]
339pub struct TestnetHandle<C, Pool> {
340    _handle: JoinHandle<()>,
341    peers: Vec<PeerHandle<Pool>>,
342    terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
343}
344
345// === impl TestnetHandle ===
346
347impl<C, Pool> TestnetHandle<C, Pool> {
348    /// Terminates the task and returns the [`Testnet`] back.
349    pub async fn terminate(self) -> Testnet<C, Pool> {
350        let (tx, rx) = oneshot::channel();
351        self.terminate.send(tx).unwrap();
352        rx.await.unwrap()
353    }
354
355    /// Returns the [`PeerHandle`]s of this [`Testnet`].
356    pub fn peers(&self) -> &[PeerHandle<Pool>] {
357        &self.peers
358    }
359
360    /// Connects all peers with each other.
361    ///
362    /// This establishes sessions concurrently between all peers.
363    ///
364    /// Returns once all sessions are established.
365    pub async fn connect_peers(&self) {
366        if self.peers.len() < 2 {
367            return
368        }
369
370        // add an event stream for _each_ peer
371        let streams =
372            self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
373
374        // add all peers to each other
375        for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
376            for idx in (idx + 1)..self.peers.len() {
377                let neighbour = &self.peers[idx];
378                handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
379            }
380        }
381
382        // await all sessions to be established
383        let num_sessions_per_peer = self.peers.len() - 1;
384        let fut = streams.into_iter().map(|mut stream| async move {
385            stream.take_session_established(num_sessions_per_peer).await
386        });
387
388        futures::future::join_all(fut).await;
389    }
390}
391
392/// A peer in the [`Testnet`].
393#[pin_project]
394#[derive(Debug)]
395pub struct Peer<C, Pool = TestPool> {
396    #[pin]
397    network: NetworkManager<EthNetworkPrimitives>,
398    #[pin]
399    request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
400    #[pin]
401    transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
402    pool: Option<Pool>,
403    client: C,
404    secret_key: SecretKey,
405}
406
407// === impl Peer ===
408
409impl<C, Pool> Peer<C, Pool>
410where
411    C: BlockReader + HeaderProvider + Clone + 'static,
412    Pool: TransactionPool,
413{
414    /// Returns the number of connected peers.
415    pub fn num_peers(&self) -> usize {
416        self.network.num_connected_peers()
417    }
418
419    /// Adds an additional protocol handler to the peer.
420    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
421        self.network.add_rlpx_sub_protocol(protocol);
422    }
423
424    /// Returns a handle to the peer's network.
425    pub fn peer_handle(&self) -> PeerHandle<Pool> {
426        PeerHandle {
427            network: self.network.handle().clone(),
428            pool: self.pool.clone(),
429            transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
430        }
431    }
432
433    /// The address that listens for incoming connections.
434    pub const fn local_addr(&self) -> SocketAddr {
435        self.network.local_addr()
436    }
437
438    /// The [`PeerId`] of this peer.
439    pub fn peer_id(&self) -> PeerId {
440        *self.network.peer_id()
441    }
442
443    /// Returns mutable access to the network.
444    pub const fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
445        &mut self.network
446    }
447
448    /// Returns the [`NetworkHandle`] of this peer.
449    pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
450        self.network.handle().clone()
451    }
452
453    /// Returns the [`TestPool`] of this peer.
454    pub const fn pool(&self) -> Option<&Pool> {
455        self.pool.as_ref()
456    }
457
458    /// Set a new request handler that's connected to the peer's network
459    pub fn install_request_handler(&mut self) {
460        let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
461        self.network.set_eth_request_handler(tx);
462        let peers = self.network.peers_handle();
463        let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
464        self.request_handler = Some(request_handler);
465    }
466
467    /// Set a new transactions manager that's connected to the peer's network
468    pub fn install_transactions_manager(&mut self, pool: Pool) {
469        let (tx, rx) = unbounded_channel();
470        self.network.set_transactions(tx);
471        let transactions_manager = TransactionsManager::new(
472            self.handle(),
473            pool.clone(),
474            rx,
475            TransactionsManagerConfig::default(),
476        );
477        self.transactions_manager = Some(transactions_manager);
478        self.pool = Some(pool);
479    }
480
481    /// Set a new transactions manager that's connected to the peer's network
482    pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
483    where
484        P: TransactionPool,
485    {
486        let Self { mut network, request_handler, client, secret_key, .. } = self;
487        let (tx, rx) = unbounded_channel();
488        network.set_transactions(tx);
489        let transactions_manager = TransactionsManager::new(
490            network.handle().clone(),
491            pool.clone(),
492            rx,
493            TransactionsManagerConfig::default(),
494        );
495        Peer {
496            network,
497            request_handler,
498            transactions_manager: Some(transactions_manager),
499            pool: Some(pool),
500            client,
501            secret_key,
502        }
503    }
504
505    /// Map transactions manager with custom config
506    pub fn map_transactions_manager_with_config<P>(
507        self,
508        pool: P,
509        config: TransactionsManagerConfig,
510    ) -> Peer<C, P>
511    where
512        P: TransactionPool,
513    {
514        self.map_transactions_manager_with(pool, config, Default::default())
515    }
516
517    /// Map transactions manager with custom config and the given policy.
518    pub fn map_transactions_manager_with<P>(
519        self,
520        pool: P,
521        config: TransactionsManagerConfig,
522        policy: TransactionPropagationKind,
523    ) -> Peer<C, P>
524    where
525        P: TransactionPool,
526    {
527        let Self { mut network, request_handler, client, secret_key, .. } = self;
528        let (tx, rx) = unbounded_channel();
529        network.set_transactions(tx);
530
531        let transactions_manager = TransactionsManager::with_policy(
532            network.handle().clone(),
533            pool.clone(),
534            rx,
535            config,
536            policy,
537        );
538
539        Peer {
540            network,
541            request_handler,
542            transactions_manager: Some(transactions_manager),
543            pool: Some(pool),
544            client,
545            secret_key,
546        }
547    }
548}
549
550impl<C> Peer<C>
551where
552    C: BlockReader + HeaderProvider + Clone + 'static,
553{
554    /// Installs a new [`TestPool`]
555    pub fn install_test_pool(&mut self) {
556        self.install_transactions_manager(TestPoolBuilder::default().into())
557    }
558}
559
560impl<C, Pool> Future for Peer<C, Pool>
561where
562    C: BlockReader<
563            Block = reth_ethereum_primitives::Block,
564            Receipt = reth_ethereum_primitives::Receipt,
565            Header = alloy_consensus::Header,
566        > + HeaderProvider
567        + Unpin
568        + 'static,
569    Pool: TransactionPool<
570            Transaction: PoolTransaction<
571                Consensus = TransactionSigned,
572                Pooled = PooledTransactionVariant,
573            >,
574        > + Unpin
575        + 'static,
576{
577    type Output = ();
578
579    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
580        let this = self.project();
581
582        if let Some(request) = this.request_handler.as_pin_mut() {
583            let _ = request.poll(cx);
584        }
585
586        if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
587            let _ = tx_manager.poll(cx);
588        }
589
590        this.network.poll(cx)
591    }
592}
593
594/// A helper config for setting up the reth networking stack.
595#[derive(Debug)]
596pub struct PeerConfig<C = NoopProvider> {
597    config: NetworkConfig<C>,
598    client: C,
599    secret_key: SecretKey,
600}
601
602/// A handle to a peer in the [`Testnet`].
603#[derive(Debug)]
604pub struct PeerHandle<Pool> {
605    network: NetworkHandle<EthNetworkPrimitives>,
606    transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
607    pool: Option<Pool>,
608}
609
610// === impl PeerHandle ===
611
612impl<Pool> PeerHandle<Pool> {
613    /// Returns the [`PeerId`] used in the network.
614    pub fn peer_id(&self) -> &PeerId {
615        self.network.peer_id()
616    }
617
618    /// Returns the [`PeersHandle`] from the network.
619    pub fn peer_handle(&self) -> &PeersHandle {
620        self.network.peers_handle()
621    }
622
623    /// Returns the local socket as configured for the network.
624    pub fn local_addr(&self) -> SocketAddr {
625        self.network.local_addr()
626    }
627
628    /// Creates a new [`NetworkEvent`] listener channel.
629    pub fn event_listener(&self) -> EventStream<NetworkEvent> {
630        self.network.event_listener()
631    }
632
633    /// Returns the [`TransactionsHandle`] of this peer.
634    pub const fn transactions(&self) -> Option<&TransactionsHandle> {
635        self.transactions.as_ref()
636    }
637
638    /// Returns the [`TestPool`] of this peer.
639    pub const fn pool(&self) -> Option<&Pool> {
640        self.pool.as_ref()
641    }
642
643    /// Returns the [`NetworkHandle`] of this peer.
644    pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
645        &self.network
646    }
647}
648
649// === impl PeerConfig ===
650
651impl<C> PeerConfig<C>
652where
653    C: BlockReader + HeaderProvider + Clone + 'static,
654{
655    /// Launches the network and returns the [Peer] that manages it
656    pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
657        let Self { config, client, secret_key } = self;
658        let network = NetworkManager::new(config).await?;
659        let peer = Peer {
660            network,
661            client,
662            secret_key,
663            request_handler: None,
664            transactions_manager: None,
665            pool: None,
666        };
667        Ok(peer)
668    }
669
670    /// Initialize the network with a random secret key, allowing the devp2p and discovery to bind
671    /// to any available IP and port.
672    pub fn new(client: C) -> Self
673    where
674        C: ChainSpecProvider<ChainSpec: Hardforks>,
675    {
676        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
677        let config = Self::network_config_builder(secret_key).build(client.clone());
678        Self { config, client, secret_key }
679    }
680
681    /// Initialize the network with a given secret key, allowing devp2p and discovery to bind any
682    /// available IP and port.
683    pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
684    where
685        C: ChainSpecProvider<ChainSpec: Hardforks>,
686    {
687        let config = Self::network_config_builder(secret_key).build(client.clone());
688        Self { config, client, secret_key }
689    }
690
691    /// Initialize the network with a given capabilities.
692    pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
693    where
694        C: ChainSpecProvider<ChainSpec: Hardforks>,
695    {
696        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
697
698        let builder = Self::network_config_builder(secret_key);
699        let hello_message =
700            HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
701        let config = builder.hello_message(hello_message).build(client.clone());
702
703        Self { config, client, secret_key }
704    }
705
706    fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
707        NetworkConfigBuilder::new(secret_key)
708            .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
709            .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
710            .disable_dns_discovery()
711            .disable_discv4_discovery()
712    }
713}
714
715impl Default for PeerConfig {
716    fn default() -> Self {
717        Self::new(NoopProvider::default())
718    }
719}
720
721/// A helper type to await network events
722///
723/// This makes it easier to await established connections
724#[derive(Debug)]
725pub struct NetworkEventStream {
726    inner: EventStream<NetworkEvent>,
727}
728
729// === impl NetworkEventStream ===
730
731impl NetworkEventStream {
732    /// Create a new [`NetworkEventStream`] from the given network event receiver stream.
733    pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
734        Self { inner }
735    }
736
737    /// Awaits the next event for a session to be closed
738    pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
739        while let Some(ev) = self.inner.next().await {
740            if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
741                return Some((peer_id, reason))
742            }
743        }
744        None
745    }
746
747    /// Awaits the next event for an established session
748    pub async fn next_session_established(&mut self) -> Option<PeerId> {
749        while let Some(ev) = self.inner.next().await {
750            match ev {
751                NetworkEvent::ActivePeerSession { info, .. } |
752                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
753                    return Some(info.peer_id)
754                }
755                _ => {}
756            }
757        }
758        None
759    }
760
761    /// Awaits the next `num` events for an established session
762    pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
763        if num == 0 {
764            return Vec::new();
765        }
766        let mut peers = Vec::with_capacity(num);
767        while let Some(ev) = self.inner.next().await {
768            if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
769                peers.push(peer_id);
770                num -= 1;
771                if num == 0 {
772                    return peers;
773                }
774            }
775        }
776        peers
777    }
778
779    /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and
780    /// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established
781    /// session.
782    pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
783        let peer_id = match self.inner.next().await {
784            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
785            _ => return None,
786        };
787
788        match self.inner.next().await {
789            Some(NetworkEvent::ActivePeerSession {
790                info: SessionInfo { peer_id: peer_id2, .. },
791                ..
792            }) => {
793                debug_assert_eq!(
794                    peer_id, peer_id2,
795                    "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
796                );
797                Some(peer_id)
798            }
799            _ => None,
800        }
801    }
802
803    /// Awaits the next event for a peer added.
804    pub async fn peer_added(&mut self) -> Option<PeerId> {
805        let peer_id = match self.inner.next().await {
806            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
807            _ => return None,
808        };
809
810        Some(peer_id)
811    }
812
813    /// Awaits the next event for a peer removed.
814    pub async fn peer_removed(&mut self) -> Option<PeerId> {
815        let peer_id = match self.inner.next().await {
816            Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
817            _ => return None,
818        };
819
820        Some(peer_id)
821    }
822}