1use crate::{
4 builder::ETH_REQUEST_CHANNEL_CAPACITY,
5 error::NetworkError,
6 eth_requests::EthRequestHandler,
7 protocol::IntoRlpxSubProtocol,
8 transactions::{
9 config::TransactionPropagationKind, TransactionsHandle, TransactionsManager,
10 TransactionsManagerConfig,
11 },
12 NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
13};
14use futures::{FutureExt, StreamExt};
15use pin_project::pin_project;
16use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
17use reth_eth_wire::{
18 protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
19};
20use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
21use reth_network_api::{
22 events::{PeerEvent, SessionInfo},
23 test_utils::{PeersHandle, PeersHandleProvider},
24 NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
25};
26use reth_network_peers::PeerId;
27use reth_storage_api::{
28 noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
29};
30use reth_tasks::TokioTaskExecutor;
31use reth_tokio_util::EventStream;
32use reth_transaction_pool::{
33 blobstore::InMemoryBlobStore,
34 test_utils::{TestPool, TestPoolBuilder},
35 EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
36};
37use secp256k1::SecretKey;
38use std::{
39 fmt,
40 future::Future,
41 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
42 pin::Pin,
43 task::{Context, Poll},
44};
45use tokio::{
46 sync::{
47 mpsc::{channel, unbounded_channel},
48 oneshot,
49 },
50 task::JoinHandle,
51};
52
53pub struct Testnet<C, Pool> {
55 peers: Vec<Peer<C, Pool>>,
57}
58
59impl<C> Testnet<C, TestPool>
62where
63 C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
64{
65 pub async fn create_with(num_peers: usize, provider: C) -> Self {
67 Self::try_create_with(num_peers, provider).await.unwrap()
68 }
69
70 pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
72 let mut this = Self { peers: Vec::with_capacity(num_peers) };
73 for _ in 0..num_peers {
74 let config = PeerConfig::new(provider.clone());
75 this.add_peer_with_config(config).await?;
76 }
77 Ok(this)
78 }
79
80 pub async fn extend_peer_with_config(
83 &mut self,
84 configs: impl IntoIterator<Item = PeerConfig<C>>,
85 ) -> Result<(), NetworkError> {
86 let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
87 let peers = futures::future::join_all(peers).await;
88 for peer in peers {
89 self.peers.push(peer?);
90 }
91 Ok(())
92 }
93}
94
95impl<C, Pool> Testnet<C, Pool>
96where
97 C: BlockReader + HeaderProvider + Clone + 'static,
98 Pool: TransactionPool,
99{
100 pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
102 &mut self.peers
103 }
104
105 pub fn peers(&self) -> &[Peer<C, Pool>] {
107 &self.peers
108 }
109
110 pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
115 self.peers.remove(index)
116 }
117
118 pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
120 self.peers.iter_mut()
121 }
122
123 pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
125 self.peers.iter()
126 }
127
128 pub async fn add_peer_with_config(
130 &mut self,
131 config: PeerConfig<C>,
132 ) -> Result<(), NetworkError> {
133 let PeerConfig { config, client, secret_key } = config;
134
135 let network = NetworkManager::new(config).await?;
136 let peer = Peer {
137 network,
138 client,
139 secret_key,
140 request_handler: None,
141 transactions_manager: None,
142 pool: None,
143 };
144 self.peers.push(peer);
145 Ok(())
146 }
147
148 pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
150 self.peers.iter().map(|p| p.handle())
151 }
152
153 pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
155 where
156 F: Fn(Peer<C, Pool>) -> Peer<C, P>,
157 P: TransactionPool,
158 {
159 Testnet { peers: self.peers.into_iter().map(f).collect() }
160 }
161
162 pub fn for_each<F>(&self, f: F)
164 where
165 F: Fn(&Peer<C, Pool>),
166 {
167 self.peers.iter().for_each(f)
168 }
169
170 pub fn for_each_mut<F>(&mut self, f: F)
172 where
173 F: FnMut(&mut Peer<C, Pool>),
174 {
175 self.peers.iter_mut().for_each(f)
176 }
177}
178
179impl<C, Pool> Testnet<C, Pool>
180where
181 C: ChainSpecProvider<ChainSpec: EthereumHardforks>
182 + StateProviderFactory
183 + BlockReaderIdExt
184 + HeaderProvider
185 + Clone
186 + 'static,
187 Pool: TransactionPool,
188{
189 pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
191 self.map_pool(|peer| {
192 let blob_store = InMemoryBlobStore::default();
193 let pool = TransactionValidationTaskExecutor::eth(
194 peer.client.clone(),
195 blob_store.clone(),
196 TokioTaskExecutor::default(),
197 );
198 peer.map_transactions_manager(EthTransactionPool::eth_pool(
199 pool,
200 blob_store,
201 Default::default(),
202 ))
203 })
204 }
205
206 pub fn with_eth_pool_config(
208 self,
209 tx_manager_config: TransactionsManagerConfig,
210 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
211 self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
212 }
213
214 pub fn with_eth_pool_config_and_policy(
216 self,
217 tx_manager_config: TransactionsManagerConfig,
218 policy: TransactionPropagationKind,
219 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
220 self.map_pool(|peer| {
221 let blob_store = InMemoryBlobStore::default();
222 let pool = TransactionValidationTaskExecutor::eth(
223 peer.client.clone(),
224 blob_store.clone(),
225 TokioTaskExecutor::default(),
226 );
227
228 peer.map_transactions_manager_with(
229 EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
230 tx_manager_config.clone(),
231 policy,
232 )
233 })
234 }
235}
236
237impl<C, Pool> Testnet<C, Pool>
238where
239 C: BlockReader<
240 Block = reth_ethereum_primitives::Block,
241 Receipt = reth_ethereum_primitives::Receipt,
242 Header = alloy_consensus::Header,
243 > + HeaderProvider
244 + Clone
245 + Unpin
246 + 'static,
247 Pool: TransactionPool<
248 Transaction: PoolTransaction<
249 Consensus = TransactionSigned,
250 Pooled = PooledTransactionVariant,
251 >,
252 > + Unpin
253 + 'static,
254{
255 pub fn spawn(self) -> TestnetHandle<C, Pool> {
257 let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
258 let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
259 let mut net = self;
260 let handle = tokio::task::spawn(async move {
261 let mut tx = None;
262 tokio::select! {
263 _ = &mut net => {}
264 inc = rx => {
265 tx = inc.ok();
266 }
267 }
268 if let Some(tx) = tx {
269 let _ = tx.send(net);
270 }
271 });
272
273 TestnetHandle { _handle: handle, peers, terminate: tx }
274 }
275}
276
277impl Testnet<NoopProvider, TestPool> {
278 pub async fn create(num_peers: usize) -> Self {
280 Self::try_create(num_peers).await.unwrap()
281 }
282
283 pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
285 let mut this = Self::default();
286
287 this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
288 Ok(this)
289 }
290
291 pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
293 self.add_peer_with_config(Default::default()).await
294 }
295}
296
297impl<C, Pool> Default for Testnet<C, Pool> {
298 fn default() -> Self {
299 Self { peers: Vec::new() }
300 }
301}
302
303impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
304 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305 f.debug_struct("Testnet {{}}").finish_non_exhaustive()
306 }
307}
308
309impl<C, Pool> Future for Testnet<C, Pool>
310where
311 C: BlockReader<
312 Block = reth_ethereum_primitives::Block,
313 Receipt = reth_ethereum_primitives::Receipt,
314 Header = alloy_consensus::Header,
315 > + HeaderProvider
316 + Unpin
317 + 'static,
318 Pool: TransactionPool<
319 Transaction: PoolTransaction<
320 Consensus = TransactionSigned,
321 Pooled = PooledTransactionVariant,
322 >,
323 > + Unpin
324 + 'static,
325{
326 type Output = ();
327
328 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329 let this = self.get_mut();
330 for peer in &mut this.peers {
331 let _ = peer.poll_unpin(cx);
332 }
333 Poll::Pending
334 }
335}
336
337#[derive(Debug)]
339pub struct TestnetHandle<C, Pool> {
340 _handle: JoinHandle<()>,
341 peers: Vec<PeerHandle<Pool>>,
342 terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
343}
344
345impl<C, Pool> TestnetHandle<C, Pool> {
348 pub async fn terminate(self) -> Testnet<C, Pool> {
350 let (tx, rx) = oneshot::channel();
351 self.terminate.send(tx).unwrap();
352 rx.await.unwrap()
353 }
354
355 pub fn peers(&self) -> &[PeerHandle<Pool>] {
357 &self.peers
358 }
359
360 pub async fn connect_peers(&self) {
366 if self.peers.len() < 2 {
367 return
368 }
369
370 let streams =
372 self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
373
374 for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
376 for idx in (idx + 1)..self.peers.len() {
377 let neighbour = &self.peers[idx];
378 handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
379 }
380 }
381
382 let num_sessions_per_peer = self.peers.len() - 1;
384 let fut = streams.into_iter().map(|mut stream| async move {
385 stream.take_session_established(num_sessions_per_peer).await
386 });
387
388 futures::future::join_all(fut).await;
389 }
390}
391
392#[pin_project]
394#[derive(Debug)]
395pub struct Peer<C, Pool = TestPool> {
396 #[pin]
397 network: NetworkManager<EthNetworkPrimitives>,
398 #[pin]
399 request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
400 #[pin]
401 transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
402 pool: Option<Pool>,
403 client: C,
404 secret_key: SecretKey,
405}
406
407impl<C, Pool> Peer<C, Pool>
410where
411 C: BlockReader + HeaderProvider + Clone + 'static,
412 Pool: TransactionPool,
413{
414 pub fn num_peers(&self) -> usize {
416 self.network.num_connected_peers()
417 }
418
419 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
421 self.network.add_rlpx_sub_protocol(protocol);
422 }
423
424 pub fn peer_handle(&self) -> PeerHandle<Pool> {
426 PeerHandle {
427 network: self.network.handle().clone(),
428 pool: self.pool.clone(),
429 transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
430 }
431 }
432
433 pub const fn local_addr(&self) -> SocketAddr {
435 self.network.local_addr()
436 }
437
438 pub fn peer_id(&self) -> PeerId {
440 *self.network.peer_id()
441 }
442
443 pub const fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
445 &mut self.network
446 }
447
448 pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
450 self.network.handle().clone()
451 }
452
453 pub const fn pool(&self) -> Option<&Pool> {
455 self.pool.as_ref()
456 }
457
458 pub fn install_request_handler(&mut self) {
460 let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
461 self.network.set_eth_request_handler(tx);
462 let peers = self.network.peers_handle();
463 let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
464 self.request_handler = Some(request_handler);
465 }
466
467 pub fn install_transactions_manager(&mut self, pool: Pool) {
469 let (tx, rx) = unbounded_channel();
470 self.network.set_transactions(tx);
471 let transactions_manager = TransactionsManager::new(
472 self.handle(),
473 pool.clone(),
474 rx,
475 TransactionsManagerConfig::default(),
476 );
477 self.transactions_manager = Some(transactions_manager);
478 self.pool = Some(pool);
479 }
480
481 pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
483 where
484 P: TransactionPool,
485 {
486 let Self { mut network, request_handler, client, secret_key, .. } = self;
487 let (tx, rx) = unbounded_channel();
488 network.set_transactions(tx);
489 let transactions_manager = TransactionsManager::new(
490 network.handle().clone(),
491 pool.clone(),
492 rx,
493 TransactionsManagerConfig::default(),
494 );
495 Peer {
496 network,
497 request_handler,
498 transactions_manager: Some(transactions_manager),
499 pool: Some(pool),
500 client,
501 secret_key,
502 }
503 }
504
505 pub fn map_transactions_manager_with_config<P>(
507 self,
508 pool: P,
509 config: TransactionsManagerConfig,
510 ) -> Peer<C, P>
511 where
512 P: TransactionPool,
513 {
514 self.map_transactions_manager_with(pool, config, Default::default())
515 }
516
517 pub fn map_transactions_manager_with<P>(
519 self,
520 pool: P,
521 config: TransactionsManagerConfig,
522 policy: TransactionPropagationKind,
523 ) -> Peer<C, P>
524 where
525 P: TransactionPool,
526 {
527 let Self { mut network, request_handler, client, secret_key, .. } = self;
528 let (tx, rx) = unbounded_channel();
529 network.set_transactions(tx);
530
531 let transactions_manager = TransactionsManager::with_policy(
532 network.handle().clone(),
533 pool.clone(),
534 rx,
535 config,
536 policy,
537 );
538
539 Peer {
540 network,
541 request_handler,
542 transactions_manager: Some(transactions_manager),
543 pool: Some(pool),
544 client,
545 secret_key,
546 }
547 }
548}
549
550impl<C> Peer<C>
551where
552 C: BlockReader + HeaderProvider + Clone + 'static,
553{
554 pub fn install_test_pool(&mut self) {
556 self.install_transactions_manager(TestPoolBuilder::default().into())
557 }
558}
559
560impl<C, Pool> Future for Peer<C, Pool>
561where
562 C: BlockReader<
563 Block = reth_ethereum_primitives::Block,
564 Receipt = reth_ethereum_primitives::Receipt,
565 Header = alloy_consensus::Header,
566 > + HeaderProvider
567 + Unpin
568 + 'static,
569 Pool: TransactionPool<
570 Transaction: PoolTransaction<
571 Consensus = TransactionSigned,
572 Pooled = PooledTransactionVariant,
573 >,
574 > + Unpin
575 + 'static,
576{
577 type Output = ();
578
579 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
580 let this = self.project();
581
582 if let Some(request) = this.request_handler.as_pin_mut() {
583 let _ = request.poll(cx);
584 }
585
586 if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
587 let _ = tx_manager.poll(cx);
588 }
589
590 this.network.poll(cx)
591 }
592}
593
594#[derive(Debug)]
596pub struct PeerConfig<C = NoopProvider> {
597 config: NetworkConfig<C>,
598 client: C,
599 secret_key: SecretKey,
600}
601
602#[derive(Debug)]
604pub struct PeerHandle<Pool> {
605 network: NetworkHandle<EthNetworkPrimitives>,
606 transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
607 pool: Option<Pool>,
608}
609
610impl<Pool> PeerHandle<Pool> {
613 pub fn peer_id(&self) -> &PeerId {
615 self.network.peer_id()
616 }
617
618 pub fn peer_handle(&self) -> &PeersHandle {
620 self.network.peers_handle()
621 }
622
623 pub fn local_addr(&self) -> SocketAddr {
625 self.network.local_addr()
626 }
627
628 pub fn event_listener(&self) -> EventStream<NetworkEvent> {
630 self.network.event_listener()
631 }
632
633 pub const fn transactions(&self) -> Option<&TransactionsHandle> {
635 self.transactions.as_ref()
636 }
637
638 pub const fn pool(&self) -> Option<&Pool> {
640 self.pool.as_ref()
641 }
642
643 pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
645 &self.network
646 }
647}
648
649impl<C> PeerConfig<C>
652where
653 C: BlockReader + HeaderProvider + Clone + 'static,
654{
655 pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
657 let Self { config, client, secret_key } = self;
658 let network = NetworkManager::new(config).await?;
659 let peer = Peer {
660 network,
661 client,
662 secret_key,
663 request_handler: None,
664 transactions_manager: None,
665 pool: None,
666 };
667 Ok(peer)
668 }
669
670 pub fn new(client: C) -> Self
673 where
674 C: ChainSpecProvider<ChainSpec: Hardforks>,
675 {
676 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
677 let config = Self::network_config_builder(secret_key).build(client.clone());
678 Self { config, client, secret_key }
679 }
680
681 pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
684 where
685 C: ChainSpecProvider<ChainSpec: Hardforks>,
686 {
687 let config = Self::network_config_builder(secret_key).build(client.clone());
688 Self { config, client, secret_key }
689 }
690
691 pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
693 where
694 C: ChainSpecProvider<ChainSpec: Hardforks>,
695 {
696 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
697
698 let builder = Self::network_config_builder(secret_key);
699 let hello_message =
700 HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
701 let config = builder.hello_message(hello_message).build(client.clone());
702
703 Self { config, client, secret_key }
704 }
705
706 fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
707 NetworkConfigBuilder::new(secret_key)
708 .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
709 .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
710 .disable_dns_discovery()
711 .disable_discv4_discovery()
712 }
713}
714
715impl Default for PeerConfig {
716 fn default() -> Self {
717 Self::new(NoopProvider::default())
718 }
719}
720
721#[derive(Debug)]
725pub struct NetworkEventStream {
726 inner: EventStream<NetworkEvent>,
727}
728
729impl NetworkEventStream {
732 pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
734 Self { inner }
735 }
736
737 pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
739 while let Some(ev) = self.inner.next().await {
740 if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
741 return Some((peer_id, reason))
742 }
743 }
744 None
745 }
746
747 pub async fn next_session_established(&mut self) -> Option<PeerId> {
749 while let Some(ev) = self.inner.next().await {
750 match ev {
751 NetworkEvent::ActivePeerSession { info, .. } |
752 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
753 return Some(info.peer_id)
754 }
755 _ => {}
756 }
757 }
758 None
759 }
760
761 pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
763 if num == 0 {
764 return Vec::new();
765 }
766 let mut peers = Vec::with_capacity(num);
767 while let Some(ev) = self.inner.next().await {
768 if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
769 peers.push(peer_id);
770 num -= 1;
771 if num == 0 {
772 return peers;
773 }
774 }
775 }
776 peers
777 }
778
779 pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
783 let peer_id = match self.inner.next().await {
784 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
785 _ => return None,
786 };
787
788 match self.inner.next().await {
789 Some(NetworkEvent::ActivePeerSession {
790 info: SessionInfo { peer_id: peer_id2, .. },
791 ..
792 }) => {
793 debug_assert_eq!(
794 peer_id, peer_id2,
795 "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
796 );
797 Some(peer_id)
798 }
799 _ => None,
800 }
801 }
802
803 pub async fn peer_added(&mut self) -> Option<PeerId> {
805 let peer_id = match self.inner.next().await {
806 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
807 _ => return None,
808 };
809
810 Some(peer_id)
811 }
812
813 pub async fn peer_removed(&mut self) -> Option<PeerId> {
815 let peer_id = match self.inner.next().await {
816 Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
817 _ => return None,
818 };
819
820 Some(peer_id)
821 }
822}