1use std::{
11 collections::VecDeque,
12 fmt,
13 future::Future,
14 io,
15 pin::{pin, Pin},
16 task::{ready, Context, Poll},
17};
18
19use crate::{
20 capability::{SharedCapabilities, SharedCapability, UnsupportedCapabilityError},
21 errors::{EthStreamError, P2PStreamError},
22 p2pstream::DisconnectP2P,
23 CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, UnauthedEthStream,
24 UnifiedStatus,
25};
26use bytes::{Bytes, BytesMut};
27use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
28use reth_eth_wire_types::NetworkPrimitives;
29use reth_ethereum_forks::ForkFilter;
30use tokio::sync::{mpsc, mpsc::UnboundedSender};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32
33#[derive(Debug)]
36pub struct RlpxProtocolMultiplexer<St> {
37 inner: MultiplexInner<St>,
38}
39
40impl<St> RlpxProtocolMultiplexer<St> {
41 pub fn new(conn: P2PStream<St>) -> Self {
43 Self {
44 inner: MultiplexInner {
45 conn,
46 protocols: Default::default(),
47 out_buffer: Default::default(),
48 },
49 }
50 }
51
52 pub fn install_protocol<F, Proto>(
57 &mut self,
58 cap: &Capability,
59 f: F,
60 ) -> Result<(), UnsupportedCapabilityError>
61 where
62 F: FnOnce(ProtocolConnection) -> Proto,
63 Proto: Stream<Item = BytesMut> + Send + 'static,
64 {
65 self.inner.install_protocol(cap, f)
66 }
67
68 pub const fn shared_capabilities(&self) -> &SharedCapabilities {
70 self.inner.shared_capabilities()
71 }
72
73 pub fn into_satellite_stream<F, Primary>(
75 self,
76 cap: &Capability,
77 primary: F,
78 ) -> Result<RlpxSatelliteStream<St, Primary>, P2PStreamError>
79 where
80 F: FnOnce(ProtocolProxy) -> Primary,
81 {
82 let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
83 else {
84 return Err(P2PStreamError::CapabilityNotShared)
85 };
86
87 let (to_primary, from_wire) = mpsc::unbounded_channel();
88 let (to_wire, from_primary) = mpsc::unbounded_channel();
89 let proxy = ProtocolProxy {
90 shared_cap: shared_cap.clone(),
91 from_wire: UnboundedReceiverStream::new(from_wire),
92 to_wire,
93 };
94
95 let st = primary(proxy);
96 Ok(RlpxSatelliteStream {
97 inner: self.inner,
98 primary: PrimaryProtocol {
99 to_primary,
100 from_primary: UnboundedReceiverStream::new(from_primary),
101 st,
102 shared_cap,
103 },
104 })
105 }
106
107 pub async fn into_satellite_stream_with_handshake<F, Fut, Err, Primary>(
112 self,
113 cap: &Capability,
114 handshake: F,
115 ) -> Result<RlpxSatelliteStream<St, Primary>, Err>
116 where
117 F: FnOnce(ProtocolProxy) -> Fut,
118 Fut: Future<Output = Result<Primary, Err>>,
119 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
120 P2PStreamError: Into<Err>,
121 {
122 self.into_satellite_stream_with_tuple_handshake(cap, move |proxy| async move {
123 let st = handshake(proxy).await?;
124 Ok((st, ()))
125 })
126 .await
127 .map(|(st, _)| st)
128 }
129
130 pub async fn into_satellite_stream_with_tuple_handshake<F, Fut, Err, Primary, Extra>(
140 mut self,
141 cap: &Capability,
142 handshake: F,
143 ) -> Result<(RlpxSatelliteStream<St, Primary>, Extra), Err>
144 where
145 F: FnOnce(ProtocolProxy) -> Fut,
146 Fut: Future<Output = Result<(Primary, Extra), Err>>,
147 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
148 P2PStreamError: Into<Err>,
149 {
150 let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
151 else {
152 return Err(P2PStreamError::CapabilityNotShared.into())
153 };
154
155 let (to_primary, from_wire) = mpsc::unbounded_channel();
156 let (to_wire, mut from_primary) = mpsc::unbounded_channel();
157 let proxy = ProtocolProxy {
158 shared_cap: shared_cap.clone(),
159 from_wire: UnboundedReceiverStream::new(from_wire),
160 to_wire,
161 };
162
163 let f = handshake(proxy);
164 let mut f = pin!(f);
165
166 loop {
169 tokio::select! {
170 Some(Ok(msg)) = self.inner.conn.next() => {
171 let Some(offset) = msg.first().copied()
173 else {
174 return Err(P2PStreamError::EmptyProtocolMessage.into())
175 };
176 if let Some(cap) = self.shared_capabilities().find_by_relative_offset(offset).cloned() {
177 if cap == shared_cap {
178 let _ = to_primary.send(msg);
180 } else {
181 self.inner.delegate_message(&cap, msg);
183 }
184 } else {
185 return Err(P2PStreamError::UnknownReservedMessageId(offset).into())
186 }
187 }
188 Some(msg) = from_primary.recv() => {
189 self.inner.conn.send(msg).await.map_err(Into::into)?;
190 }
191 res = &mut f => {
192 let (st, extra) = res?;
193 return Ok((RlpxSatelliteStream {
194 inner: self.inner,
195 primary: PrimaryProtocol {
196 to_primary,
197 from_primary: UnboundedReceiverStream::new(from_primary),
198 st,
199 shared_cap,
200 }
201 }, extra))
202 }
203 }
204 }
205 }
206
207 pub async fn into_eth_satellite_stream<N: NetworkPrimitives>(
210 self,
211 status: UnifiedStatus,
212 fork_filter: ForkFilter,
213 ) -> Result<(RlpxSatelliteStream<St, EthStream<ProtocolProxy, N>>, UnifiedStatus), EthStreamError>
214 where
215 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
216 {
217 let eth_cap = self.inner.conn.shared_capabilities().eth_version()?;
218 self.into_satellite_stream_with_tuple_handshake(
219 &Capability::eth(eth_cap),
220 move |proxy| async move {
221 UnauthedEthStream::new(proxy).handshake(status, fork_filter).await
222 },
223 )
224 .await
225 }
226}
227
228#[derive(Debug)]
229struct MultiplexInner<St> {
230 conn: P2PStream<St>,
232 protocols: Vec<ProtocolStream>,
234 out_buffer: VecDeque<Bytes>,
236}
237
238impl<St> MultiplexInner<St> {
239 const fn shared_capabilities(&self) -> &SharedCapabilities {
240 self.conn.shared_capabilities()
241 }
242
243 fn delegate_message(&self, cap: &SharedCapability, msg: BytesMut) -> bool {
245 for proto in &self.protocols {
246 if proto.shared_cap == *cap {
247 proto.send_raw(msg);
248 return true
249 }
250 }
251 false
252 }
253
254 fn install_protocol<F, Proto>(
255 &mut self,
256 cap: &Capability,
257 f: F,
258 ) -> Result<(), UnsupportedCapabilityError>
259 where
260 F: FnOnce(ProtocolConnection) -> Proto,
261 Proto: Stream<Item = BytesMut> + Send + 'static,
262 {
263 let shared_cap =
264 self.conn.shared_capabilities().ensure_matching_capability(cap).cloned()?;
265 let (to_satellite, rx) = mpsc::unbounded_channel();
266 let proto_conn = ProtocolConnection { from_wire: UnboundedReceiverStream::new(rx) };
267 let st = f(proto_conn);
268 let st = ProtocolStream { shared_cap, to_satellite, satellite_st: Box::pin(st) };
269 self.protocols.push(st);
270 Ok(())
271 }
272}
273
274#[derive(Debug)]
276struct PrimaryProtocol<Primary> {
277 to_primary: UnboundedSender<BytesMut>,
279 from_primary: UnboundedReceiverStream<Bytes>,
281 shared_cap: SharedCapability,
283 st: Primary,
285}
286
287#[derive(Debug)]
291pub struct ProtocolProxy {
292 shared_cap: SharedCapability,
293 from_wire: UnboundedReceiverStream<BytesMut>,
295 to_wire: UnboundedSender<Bytes>,
297}
298
299impl ProtocolProxy {
300 fn try_send(&self, msg: Bytes) -> Result<(), io::Error> {
302 if msg.is_empty() {
303 return Err(io::ErrorKind::InvalidInput.into())
305 }
306 self.to_wire.send(self.mask_msg_id(msg)?).map_err(|_| io::ErrorKind::BrokenPipe.into())
307 }
308
309 #[inline]
311 fn mask_msg_id(&self, msg: Bytes) -> Result<Bytes, io::Error> {
312 if msg.is_empty() {
313 return Err(io::ErrorKind::InvalidInput.into())
315 }
316
317 let offset = self.shared_cap.relative_message_id_offset();
318 if offset == 0 {
319 return Ok(msg);
320 }
321
322 let mut masked = Vec::from(msg);
323 masked[0] = masked[0].checked_add(offset).ok_or(io::ErrorKind::InvalidInput)?;
324 Ok(masked.into())
325 }
326
327 #[inline]
329 fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
330 if msg.is_empty() {
331 return Err(io::ErrorKind::InvalidInput.into())
333 }
334 msg[0] = msg[0]
335 .checked_sub(self.shared_cap.relative_message_id_offset())
336 .ok_or(io::ErrorKind::InvalidInput)?;
337 Ok(msg)
338 }
339}
340
341impl Stream for ProtocolProxy {
342 type Item = Result<BytesMut, io::Error>;
343
344 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
345 let msg = ready!(self.from_wire.poll_next_unpin(cx));
346 Poll::Ready(msg.map(|msg| self.get_mut().unmask_id(msg)))
347 }
348}
349
350impl Sink<Bytes> for ProtocolProxy {
351 type Error = io::Error;
352
353 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
354 Poll::Ready(Ok(()))
355 }
356
357 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
358 self.get_mut().try_send(item)
359 }
360
361 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
362 Poll::Ready(Ok(()))
363 }
364
365 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
366 Poll::Ready(Ok(()))
367 }
368}
369
370impl CanDisconnect<Bytes> for ProtocolProxy {
371 fn disconnect(
372 &mut self,
373 _reason: DisconnectReason,
374 ) -> Pin<Box<dyn Future<Output = Result<(), <Self as Sink<Bytes>>::Error>> + Send + '_>> {
375 Box::pin(async move { Ok(()) })
377 }
378}
379
380#[derive(Debug)]
384pub struct ProtocolConnection {
385 from_wire: UnboundedReceiverStream<BytesMut>,
386}
387
388impl Stream for ProtocolConnection {
389 type Item = BytesMut;
390
391 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
392 self.from_wire.poll_next_unpin(cx)
393 }
394}
395
396#[derive(Debug)]
399pub struct RlpxSatelliteStream<St, Primary> {
400 inner: MultiplexInner<St>,
401 primary: PrimaryProtocol<Primary>,
402}
403
404impl<St, Primary> RlpxSatelliteStream<St, Primary> {
405 pub fn install_protocol<F, Proto>(
410 &mut self,
411 cap: &Capability,
412 f: F,
413 ) -> Result<(), UnsupportedCapabilityError>
414 where
415 F: FnOnce(ProtocolConnection) -> Proto,
416 Proto: Stream<Item = BytesMut> + Send + 'static,
417 {
418 self.inner.install_protocol(cap, f)
419 }
420
421 #[inline]
423 pub const fn primary(&self) -> &Primary {
424 &self.primary.st
425 }
426
427 #[inline]
429 pub const fn primary_mut(&mut self) -> &mut Primary {
430 &mut self.primary.st
431 }
432
433 #[inline]
435 pub const fn inner(&self) -> &P2PStream<St> {
436 &self.inner.conn
437 }
438
439 #[inline]
441 pub const fn inner_mut(&mut self) -> &mut P2PStream<St> {
442 &mut self.inner.conn
443 }
444
445 #[inline]
447 pub fn into_inner(self) -> P2PStream<St> {
448 self.inner.conn
449 }
450}
451
452impl<St, Primary, PrimaryErr> Stream for RlpxSatelliteStream<St, Primary>
453where
454 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
455 Primary: TryStream<Error = PrimaryErr> + Unpin,
456 P2PStreamError: Into<PrimaryErr>,
457{
458 type Item = Result<Primary::Ok, Primary::Error>;
459
460 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
461 let this = self.get_mut();
462
463 loop {
464 if let Poll::Ready(Some(msg)) = this.primary.st.try_poll_next_unpin(cx) {
466 return Poll::Ready(Some(msg))
467 }
468
469 let mut conn_ready = true;
470 loop {
471 match this.inner.conn.poll_ready_unpin(cx) {
472 Poll::Ready(Ok(())) => {
473 if let Some(msg) = this.inner.out_buffer.pop_front() {
474 if let Err(err) = this.inner.conn.start_send_unpin(msg) {
475 return Poll::Ready(Some(Err(err.into())))
476 }
477 } else {
478 break
479 }
480 }
481 Poll::Ready(Err(err)) => {
482 if let Err(disconnect_err) =
483 this.inner.conn.start_disconnect(DisconnectReason::DisconnectRequested)
484 {
485 return Poll::Ready(Some(Err(disconnect_err.into())))
486 }
487 return Poll::Ready(Some(Err(err.into())))
488 }
489 Poll::Pending => {
490 conn_ready = false;
491 break
492 }
493 }
494 }
495
496 loop {
498 match this.primary.from_primary.poll_next_unpin(cx) {
499 Poll::Ready(Some(msg)) => {
500 this.inner.out_buffer.push_back(msg);
501 }
502 Poll::Ready(None) => {
503 return Poll::Ready(None)
505 }
506 Poll::Pending => break,
507 }
508 }
509
510 for idx in (0..this.inner.protocols.len()).rev() {
512 let mut proto = this.inner.protocols.swap_remove(idx);
513 loop {
514 match proto.poll_next_unpin(cx) {
515 Poll::Ready(Some(Err(err))) => {
516 return Poll::Ready(Some(Err(P2PStreamError::Io(err).into())))
517 }
518 Poll::Ready(Some(Ok(msg))) => {
519 this.inner.out_buffer.push_back(msg);
520 }
521 Poll::Ready(None) => return Poll::Ready(None),
522 Poll::Pending => {
523 this.inner.protocols.push(proto);
524 break
525 }
526 }
527 }
528 }
529
530 let mut delegated = false;
531 loop {
532 match this.inner.conn.poll_next_unpin(cx) {
534 Poll::Ready(Some(Ok(msg))) => {
535 delegated = true;
536 let Some(offset) = msg.first().copied() else {
537 return Poll::Ready(Some(Err(
538 P2PStreamError::EmptyProtocolMessage.into()
539 )))
540 };
541 if let Some(cap) =
543 this.inner.conn.shared_capabilities().find_by_relative_offset(offset)
544 {
545 if cap == &this.primary.shared_cap {
546 let _ = this.primary.to_primary.send(msg);
548 } else {
549 for proto in &this.inner.protocols {
551 if proto.shared_cap == *cap {
552 proto.send_raw(msg);
553 break
554 }
555 }
556 }
557 } else {
558 return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(
559 offset,
560 )
561 .into())))
562 }
563 }
564 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
565 Poll::Ready(None) => {
566 return Poll::Ready(None)
568 }
569 Poll::Pending => break,
570 }
571 }
572
573 if !conn_ready || (!delegated && this.inner.out_buffer.is_empty()) {
574 return Poll::Pending
575 }
576 }
577 }
578}
579
580impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
581where
582 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
583 Primary: Sink<T> + Unpin,
584 P2PStreamError: Into<<Primary as Sink<T>>::Error>,
585{
586 type Error = <Primary as Sink<T>>::Error;
587
588 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
589 let this = self.get_mut();
590 if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) {
591 return Poll::Ready(Err(err.into()))
592 }
593 if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) {
594 return Poll::Ready(Err(err))
595 }
596 Poll::Ready(Ok(()))
597 }
598
599 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
600 self.get_mut().primary.st.start_send_unpin(item)
601 }
602
603 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
604 self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into)
605 }
606
607 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
608 self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into)
609 }
610}
611
612struct ProtocolStream {
614 shared_cap: SharedCapability,
615 to_satellite: UnboundedSender<BytesMut>,
617 satellite_st: Pin<Box<dyn Stream<Item = BytesMut> + Send>>,
618}
619
620impl ProtocolStream {
621 #[inline]
623 fn mask_msg_id(&self, mut msg: BytesMut) -> Result<Bytes, io::Error> {
624 if msg.is_empty() {
625 return Err(io::ErrorKind::InvalidInput.into())
627 }
628 msg[0] = msg[0]
629 .checked_add(self.shared_cap.relative_message_id_offset())
630 .ok_or(io::ErrorKind::InvalidInput)?;
631 Ok(msg.freeze())
632 }
633
634 #[inline]
636 fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
637 if msg.is_empty() {
638 return Err(io::ErrorKind::InvalidInput.into())
640 }
641 msg[0] = msg[0]
642 .checked_sub(self.shared_cap.relative_message_id_offset())
643 .ok_or(io::ErrorKind::InvalidInput)?;
644 Ok(msg)
645 }
646
647 fn send_raw(&self, msg: BytesMut) {
649 let _ = self.unmask_id(msg).map(|msg| self.to_satellite.send(msg));
650 }
651}
652
653impl Stream for ProtocolStream {
654 type Item = Result<Bytes, io::Error>;
655
656 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
657 let this = self.get_mut();
658 let msg = ready!(this.satellite_st.as_mut().poll_next(cx));
659 Poll::Ready(msg.map(|msg| this.mask_msg_id(msg)))
660 }
661}
662
663impl fmt::Debug for ProtocolStream {
664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665 f.debug_struct("ProtocolStream").field("cap", &self.shared_cap).finish_non_exhaustive()
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672 use crate::{
673 test_utils::{
674 connect_passthrough, eth_handshake, eth_hello,
675 proto::{test_hello, TestProtoMessage},
676 },
677 UnauthedP2PStream,
678 };
679 use reth_eth_wire_types::EthNetworkPrimitives;
680 use tokio::{net::TcpListener, sync::oneshot};
681 use tokio_util::codec::Decoder;
682
683 #[tokio::test]
684 async fn eth_satellite() {
685 reth_tracing::init_test_tracing();
686 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
687 let local_addr = listener.local_addr().unwrap();
688 let (status, fork_filter) = eth_handshake();
689 let other_status = status;
690 let other_fork_filter = fork_filter.clone();
691 let _handle = tokio::spawn(async move {
692 let (incoming, _) = listener.accept().await.unwrap();
693 let stream = crate::PassthroughCodec::default().framed(incoming);
694 let (server_hello, _) = eth_hello();
695 let (p2p_stream, _) =
696 UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
697
698 let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream)
699 .handshake::<EthNetworkPrimitives>(other_status, other_fork_filter)
700 .await
701 .unwrap();
702
703 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
704 });
705
706 let conn = connect_passthrough(local_addr, eth_hello().0).await;
707 let eth = conn.shared_capabilities().eth().unwrap().clone();
708
709 let multiplexer = RlpxProtocolMultiplexer::new(conn);
710 let _satellite = multiplexer
711 .into_satellite_stream_with_handshake(
712 eth.capability().as_ref(),
713 move |proxy| async move {
714 UnauthedEthStream::new(proxy)
715 .handshake::<EthNetworkPrimitives>(status, fork_filter)
716 .await
717 },
718 )
719 .await
720 .unwrap();
721 }
722
723 #[tokio::test(flavor = "multi_thread")]
725 async fn eth_test_protocol_satellite() {
726 reth_tracing::init_test_tracing();
727 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
728 let local_addr = listener.local_addr().unwrap();
729 let (status, fork_filter) = eth_handshake();
730 let other_status = status;
731 let other_fork_filter = fork_filter.clone();
732 let _handle = tokio::spawn(async move {
733 let (incoming, _) = listener.accept().await.unwrap();
734 let stream = crate::PassthroughCodec::default().framed(incoming);
735 let (server_hello, _) = test_hello();
736 let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
737
738 let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
739 .into_eth_satellite_stream::<EthNetworkPrimitives>(other_status, other_fork_filter)
740 .await
741 .unwrap();
742
743 st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
744 async_stream::stream! {
745 yield TestProtoMessage::ping().encoded();
746 let msg = conn.next().await.unwrap();
747 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
748 assert_eq!(msg, TestProtoMessage::pong());
749
750 yield TestProtoMessage::message("hello").encoded();
751 let msg = conn.next().await.unwrap();
752 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
753 assert_eq!(msg, TestProtoMessage::message("good bye!"));
754
755 yield TestProtoMessage::message("good bye!").encoded();
756
757 futures::future::pending::<()>().await;
758 unreachable!()
759 }
760 })
761 .unwrap();
762
763 loop {
764 let _ = st.next().await;
765 }
766 });
767
768 let conn = connect_passthrough(local_addr, test_hello().0).await;
769 let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
770 .into_eth_satellite_stream::<EthNetworkPrimitives>(status, fork_filter)
771 .await
772 .unwrap();
773
774 let (tx, mut rx) = oneshot::channel();
775
776 st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
777 async_stream::stream! {
778 let msg = conn.next().await.unwrap();
779 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
780 assert_eq!(msg, TestProtoMessage::ping());
781
782 yield TestProtoMessage::pong().encoded();
783
784 let msg = conn.next().await.unwrap();
785 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
786 assert_eq!(msg, TestProtoMessage::message("hello"));
787
788 yield TestProtoMessage::message("good bye!").encoded();
789
790 let msg = conn.next().await.unwrap();
791 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
792 assert_eq!(msg, TestProtoMessage::message("good bye!"));
793
794 tx.send(()).unwrap();
795
796 futures::future::pending::<()>().await;
797 unreachable!()
798 }
799 })
800 .unwrap();
801
802 loop {
803 tokio::select! {
804 _ = &mut rx => {
805 break
806 }
807 _ = st.next() => {
808 }
809 }
810 }
811 }
812}