reth_eth_wire/
multiplex.rs

1//! Rlpx protocol multiplexer and satellite stream
2//!
3//! A Satellite is a Stream that primarily drives a single `RLPx` subprotocol but can also handle
4//! additional subprotocols.
5//!
6//! Most of other subprotocols are "dependent satellite" protocols of "eth" and not a fully standalone protocol, for example "snap", See also [snap protocol](https://github.com/ethereum/devp2p/blob/298d7a77c3bf833641579ecbbb5b13f0311eeeea/caps/snap.md?plain=1#L71)
7//! Hence it is expected that the primary protocol is "eth" and the additional protocols are
8//! "dependent satellite" protocols.
9
10use std::{
11    collections::VecDeque,
12    fmt,
13    future::Future,
14    io,
15    pin::{pin, Pin},
16    task::{ready, Context, Poll},
17};
18
19use crate::{
20    capability::{SharedCapabilities, SharedCapability, UnsupportedCapabilityError},
21    errors::{EthStreamError, P2PStreamError},
22    p2pstream::DisconnectP2P,
23    CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, UnauthedEthStream,
24    UnifiedStatus,
25};
26use bytes::{Bytes, BytesMut};
27use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
28use reth_eth_wire_types::NetworkPrimitives;
29use reth_ethereum_forks::ForkFilter;
30use tokio::sync::{mpsc, mpsc::UnboundedSender};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32
33/// A Stream and Sink type that wraps a raw rlpx stream [`P2PStream`] and handles message ID
34/// multiplexing.
35#[derive(Debug)]
36pub struct RlpxProtocolMultiplexer<St> {
37    inner: MultiplexInner<St>,
38}
39
40impl<St> RlpxProtocolMultiplexer<St> {
41    /// Wraps the raw p2p stream
42    pub fn new(conn: P2PStream<St>) -> Self {
43        Self {
44            inner: MultiplexInner {
45                conn,
46                protocols: Default::default(),
47                out_buffer: Default::default(),
48            },
49        }
50    }
51
52    /// Installs a new protocol on top of the raw p2p stream.
53    ///
54    /// This accepts a closure that receives a [`ProtocolConnection`] that will yield messages for
55    /// the given capability.
56    pub fn install_protocol<F, Proto>(
57        &mut self,
58        cap: &Capability,
59        f: F,
60    ) -> Result<(), UnsupportedCapabilityError>
61    where
62        F: FnOnce(ProtocolConnection) -> Proto,
63        Proto: Stream<Item = BytesMut> + Send + 'static,
64    {
65        self.inner.install_protocol(cap, f)
66    }
67
68    /// Returns the [`SharedCapabilities`] of the underlying raw p2p stream
69    pub const fn shared_capabilities(&self) -> &SharedCapabilities {
70        self.inner.shared_capabilities()
71    }
72
73    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol.
74    pub fn into_satellite_stream<F, Primary>(
75        self,
76        cap: &Capability,
77        primary: F,
78    ) -> Result<RlpxSatelliteStream<St, Primary>, P2PStreamError>
79    where
80        F: FnOnce(ProtocolProxy) -> Primary,
81    {
82        let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
83        else {
84            return Err(P2PStreamError::CapabilityNotShared)
85        };
86
87        let (to_primary, from_wire) = mpsc::unbounded_channel();
88        let (to_wire, from_primary) = mpsc::unbounded_channel();
89        let proxy = ProtocolProxy {
90            shared_cap: shared_cap.clone(),
91            from_wire: UnboundedReceiverStream::new(from_wire),
92            to_wire,
93        };
94
95        let st = primary(proxy);
96        Ok(RlpxSatelliteStream {
97            inner: self.inner,
98            primary: PrimaryProtocol {
99                to_primary,
100                from_primary: UnboundedReceiverStream::new(from_primary),
101                st,
102                shared_cap,
103            },
104        })
105    }
106
107    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol.
108    ///
109    /// Returns an error if the primary protocol is not supported by the remote or the handshake
110    /// failed.
111    pub async fn into_satellite_stream_with_handshake<F, Fut, Err, Primary>(
112        self,
113        cap: &Capability,
114        handshake: F,
115    ) -> Result<RlpxSatelliteStream<St, Primary>, Err>
116    where
117        F: FnOnce(ProtocolProxy) -> Fut,
118        Fut: Future<Output = Result<Primary, Err>>,
119        St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
120        P2PStreamError: Into<Err>,
121    {
122        self.into_satellite_stream_with_tuple_handshake(cap, move |proxy| async move {
123            let st = handshake(proxy).await?;
124            Ok((st, ()))
125        })
126        .await
127        .map(|(st, _)| st)
128    }
129
130    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol.
131    ///
132    /// Returns an error if the primary protocol is not supported by the remote or the handshake
133    /// failed.
134    ///
135    /// This accepts a closure that does a handshake with the remote peer and returns a tuple of the
136    /// primary stream and extra data.
137    ///
138    /// See also [`UnauthedEthStream::handshake`]
139    pub async fn into_satellite_stream_with_tuple_handshake<F, Fut, Err, Primary, Extra>(
140        mut self,
141        cap: &Capability,
142        handshake: F,
143    ) -> Result<(RlpxSatelliteStream<St, Primary>, Extra), Err>
144    where
145        F: FnOnce(ProtocolProxy) -> Fut,
146        Fut: Future<Output = Result<(Primary, Extra), Err>>,
147        St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
148        P2PStreamError: Into<Err>,
149    {
150        let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
151        else {
152            return Err(P2PStreamError::CapabilityNotShared.into())
153        };
154
155        let (to_primary, from_wire) = mpsc::unbounded_channel();
156        let (to_wire, mut from_primary) = mpsc::unbounded_channel();
157        let proxy = ProtocolProxy {
158            shared_cap: shared_cap.clone(),
159            from_wire: UnboundedReceiverStream::new(from_wire),
160            to_wire,
161        };
162
163        let f = handshake(proxy);
164        let mut f = pin!(f);
165
166        // this polls the connection and the primary stream concurrently until the handshake is
167        // complete
168        loop {
169            tokio::select! {
170                Some(Ok(msg)) = self.inner.conn.next() => {
171                    // Ensure the message belongs to the primary protocol
172                    let Some(offset) = msg.first().copied()
173                    else {
174                        return Err(P2PStreamError::EmptyProtocolMessage.into())
175                    };
176                    if let Some(cap) = self.shared_capabilities().find_by_relative_offset(offset).cloned() {
177                            if cap == shared_cap {
178                                // delegate to primary
179                                let _ = to_primary.send(msg);
180                            } else {
181                                // delegate to satellite
182                                self.inner.delegate_message(&cap, msg);
183                            }
184                        } else {
185                           return Err(P2PStreamError::UnknownReservedMessageId(offset).into())
186                        }
187                }
188                Some(msg) = from_primary.recv() => {
189                    self.inner.conn.send(msg).await.map_err(Into::into)?;
190                }
191                res = &mut f => {
192                    let (st, extra) = res?;
193                    return Ok((RlpxSatelliteStream {
194                            inner: self.inner,
195                            primary: PrimaryProtocol {
196                                to_primary,
197                                from_primary: UnboundedReceiverStream::new(from_primary),
198                                st,
199                                shared_cap,
200                            }
201                    }, extra))
202                }
203            }
204        }
205    }
206
207    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with eth protocol as the given
208    /// primary protocol.
209    pub async fn into_eth_satellite_stream<N: NetworkPrimitives>(
210        self,
211        status: UnifiedStatus,
212        fork_filter: ForkFilter,
213    ) -> Result<(RlpxSatelliteStream<St, EthStream<ProtocolProxy, N>>, UnifiedStatus), EthStreamError>
214    where
215        St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
216    {
217        let eth_cap = self.inner.conn.shared_capabilities().eth_version()?;
218        self.into_satellite_stream_with_tuple_handshake(
219            &Capability::eth(eth_cap),
220            move |proxy| async move {
221                UnauthedEthStream::new(proxy).handshake(status, fork_filter).await
222            },
223        )
224        .await
225    }
226}
227
228#[derive(Debug)]
229struct MultiplexInner<St> {
230    /// The raw p2p stream
231    conn: P2PStream<St>,
232    /// All the subprotocols that are multiplexed on top of the raw p2p stream
233    protocols: Vec<ProtocolStream>,
234    /// Buffer for outgoing messages on the wire.
235    out_buffer: VecDeque<Bytes>,
236}
237
238impl<St> MultiplexInner<St> {
239    const fn shared_capabilities(&self) -> &SharedCapabilities {
240        self.conn.shared_capabilities()
241    }
242
243    /// Delegates a message to the matching protocol.
244    fn delegate_message(&self, cap: &SharedCapability, msg: BytesMut) -> bool {
245        for proto in &self.protocols {
246            if proto.shared_cap == *cap {
247                proto.send_raw(msg);
248                return true
249            }
250        }
251        false
252    }
253
254    fn install_protocol<F, Proto>(
255        &mut self,
256        cap: &Capability,
257        f: F,
258    ) -> Result<(), UnsupportedCapabilityError>
259    where
260        F: FnOnce(ProtocolConnection) -> Proto,
261        Proto: Stream<Item = BytesMut> + Send + 'static,
262    {
263        let shared_cap =
264            self.conn.shared_capabilities().ensure_matching_capability(cap).cloned()?;
265        let (to_satellite, rx) = mpsc::unbounded_channel();
266        let proto_conn = ProtocolConnection { from_wire: UnboundedReceiverStream::new(rx) };
267        let st = f(proto_conn);
268        let st = ProtocolStream { shared_cap, to_satellite, satellite_st: Box::pin(st) };
269        self.protocols.push(st);
270        Ok(())
271    }
272}
273
274/// Represents a protocol in the multiplexer that is used as the primary protocol.
275#[derive(Debug)]
276struct PrimaryProtocol<Primary> {
277    /// Channel to send messages to the primary protocol.
278    to_primary: UnboundedSender<BytesMut>,
279    /// Receiver for messages from the primary protocol.
280    from_primary: UnboundedReceiverStream<Bytes>,
281    /// Shared capability of the primary protocol.
282    shared_cap: SharedCapability,
283    /// The primary stream.
284    st: Primary,
285}
286
287/// A Stream and Sink type that acts as a wrapper around a primary `RLPx` subprotocol (e.g. "eth")
288///
289/// Only emits and sends _non-empty_ messages
290#[derive(Debug)]
291pub struct ProtocolProxy {
292    shared_cap: SharedCapability,
293    /// Receives _non-empty_ messages from the wire
294    from_wire: UnboundedReceiverStream<BytesMut>,
295    /// Sends _non-empty_ messages from the wire
296    to_wire: UnboundedSender<Bytes>,
297}
298
299impl ProtocolProxy {
300    /// Sends a _non-empty_ message on the wire.
301    fn try_send(&self, msg: Bytes) -> Result<(), io::Error> {
302        if msg.is_empty() {
303            // message must not be empty
304            return Err(io::ErrorKind::InvalidInput.into())
305        }
306        self.to_wire.send(self.mask_msg_id(msg)?).map_err(|_| io::ErrorKind::BrokenPipe.into())
307    }
308
309    /// Masks the message ID of a message to be sent on the wire.
310    #[inline]
311    fn mask_msg_id(&self, msg: Bytes) -> Result<Bytes, io::Error> {
312        if msg.is_empty() {
313            // message must not be empty
314            return Err(io::ErrorKind::InvalidInput.into())
315        }
316
317        let offset = self.shared_cap.relative_message_id_offset();
318        if offset == 0 {
319            return Ok(msg);
320        }
321
322        let mut masked = Vec::from(msg);
323        masked[0] = masked[0].checked_add(offset).ok_or(io::ErrorKind::InvalidInput)?;
324        Ok(masked.into())
325    }
326
327    /// Unmasks the message ID of a message received from the wire.
328    #[inline]
329    fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
330        if msg.is_empty() {
331            // message must not be empty
332            return Err(io::ErrorKind::InvalidInput.into())
333        }
334        msg[0] = msg[0]
335            .checked_sub(self.shared_cap.relative_message_id_offset())
336            .ok_or(io::ErrorKind::InvalidInput)?;
337        Ok(msg)
338    }
339}
340
341impl Stream for ProtocolProxy {
342    type Item = Result<BytesMut, io::Error>;
343
344    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
345        let msg = ready!(self.from_wire.poll_next_unpin(cx));
346        Poll::Ready(msg.map(|msg| self.get_mut().unmask_id(msg)))
347    }
348}
349
350impl Sink<Bytes> for ProtocolProxy {
351    type Error = io::Error;
352
353    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
354        Poll::Ready(Ok(()))
355    }
356
357    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
358        self.get_mut().try_send(item)
359    }
360
361    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
362        Poll::Ready(Ok(()))
363    }
364
365    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
366        Poll::Ready(Ok(()))
367    }
368}
369
370impl CanDisconnect<Bytes> for ProtocolProxy {
371    fn disconnect(
372        &mut self,
373        _reason: DisconnectReason,
374    ) -> Pin<Box<dyn Future<Output = Result<(), <Self as Sink<Bytes>>::Error>> + Send + '_>> {
375        // TODO handle disconnects
376        Box::pin(async move { Ok(()) })
377    }
378}
379
380/// A connection channel to receive _`non_empty`_ messages for the negotiated protocol.
381///
382/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
383#[derive(Debug)]
384pub struct ProtocolConnection {
385    from_wire: UnboundedReceiverStream<BytesMut>,
386}
387
388impl Stream for ProtocolConnection {
389    type Item = BytesMut;
390
391    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
392        self.from_wire.poll_next_unpin(cx)
393    }
394}
395
396/// A Stream and Sink type that acts as a wrapper around a primary `RLPx` subprotocol (e.g. "eth")
397/// [`EthStream`] and can also handle additional subprotocols.
398#[derive(Debug)]
399pub struct RlpxSatelliteStream<St, Primary> {
400    inner: MultiplexInner<St>,
401    primary: PrimaryProtocol<Primary>,
402}
403
404impl<St, Primary> RlpxSatelliteStream<St, Primary> {
405    /// Installs a new protocol on top of the raw p2p stream.
406    ///
407    /// This accepts a closure that receives a [`ProtocolConnection`] that will yield messages for
408    /// the given capability.
409    pub fn install_protocol<F, Proto>(
410        &mut self,
411        cap: &Capability,
412        f: F,
413    ) -> Result<(), UnsupportedCapabilityError>
414    where
415        F: FnOnce(ProtocolConnection) -> Proto,
416        Proto: Stream<Item = BytesMut> + Send + 'static,
417    {
418        self.inner.install_protocol(cap, f)
419    }
420
421    /// Returns the primary protocol.
422    #[inline]
423    pub const fn primary(&self) -> &Primary {
424        &self.primary.st
425    }
426
427    /// Returns mutable access to the primary protocol.
428    #[inline]
429    pub const fn primary_mut(&mut self) -> &mut Primary {
430        &mut self.primary.st
431    }
432
433    /// Returns the underlying [`P2PStream`].
434    #[inline]
435    pub const fn inner(&self) -> &P2PStream<St> {
436        &self.inner.conn
437    }
438
439    /// Returns mutable access to the underlying [`P2PStream`].
440    #[inline]
441    pub const fn inner_mut(&mut self) -> &mut P2PStream<St> {
442        &mut self.inner.conn
443    }
444
445    /// Consumes this type and returns the wrapped [`P2PStream`].
446    #[inline]
447    pub fn into_inner(self) -> P2PStream<St> {
448        self.inner.conn
449    }
450}
451
452impl<St, Primary, PrimaryErr> Stream for RlpxSatelliteStream<St, Primary>
453where
454    St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
455    Primary: TryStream<Error = PrimaryErr> + Unpin,
456    P2PStreamError: Into<PrimaryErr>,
457{
458    type Item = Result<Primary::Ok, Primary::Error>;
459
460    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
461        let this = self.get_mut();
462
463        loop {
464            // first drain the primary stream
465            if let Poll::Ready(Some(msg)) = this.primary.st.try_poll_next_unpin(cx) {
466                return Poll::Ready(Some(msg))
467            }
468
469            let mut conn_ready = true;
470            loop {
471                match this.inner.conn.poll_ready_unpin(cx) {
472                    Poll::Ready(Ok(())) => {
473                        if let Some(msg) = this.inner.out_buffer.pop_front() {
474                            if let Err(err) = this.inner.conn.start_send_unpin(msg) {
475                                return Poll::Ready(Some(Err(err.into())))
476                            }
477                        } else {
478                            break
479                        }
480                    }
481                    Poll::Ready(Err(err)) => {
482                        if let Err(disconnect_err) =
483                            this.inner.conn.start_disconnect(DisconnectReason::DisconnectRequested)
484                        {
485                            return Poll::Ready(Some(Err(disconnect_err.into())))
486                        }
487                        return Poll::Ready(Some(Err(err.into())))
488                    }
489                    Poll::Pending => {
490                        conn_ready = false;
491                        break
492                    }
493                }
494            }
495
496            // advance primary out
497            loop {
498                match this.primary.from_primary.poll_next_unpin(cx) {
499                    Poll::Ready(Some(msg)) => {
500                        this.inner.out_buffer.push_back(msg);
501                    }
502                    Poll::Ready(None) => {
503                        // primary closed
504                        return Poll::Ready(None)
505                    }
506                    Poll::Pending => break,
507                }
508            }
509
510            // advance all satellites
511            for idx in (0..this.inner.protocols.len()).rev() {
512                let mut proto = this.inner.protocols.swap_remove(idx);
513                loop {
514                    match proto.poll_next_unpin(cx) {
515                        Poll::Ready(Some(Err(err))) => {
516                            return Poll::Ready(Some(Err(P2PStreamError::Io(err).into())))
517                        }
518                        Poll::Ready(Some(Ok(msg))) => {
519                            this.inner.out_buffer.push_back(msg);
520                        }
521                        Poll::Ready(None) => return Poll::Ready(None),
522                        Poll::Pending => {
523                            this.inner.protocols.push(proto);
524                            break
525                        }
526                    }
527                }
528            }
529
530            let mut delegated = false;
531            loop {
532                // pull messages from connection
533                match this.inner.conn.poll_next_unpin(cx) {
534                    Poll::Ready(Some(Ok(msg))) => {
535                        delegated = true;
536                        let Some(offset) = msg.first().copied() else {
537                            return Poll::Ready(Some(Err(
538                                P2PStreamError::EmptyProtocolMessage.into()
539                            )))
540                        };
541                        // delegate the multiplexed message to the correct protocol
542                        if let Some(cap) =
543                            this.inner.conn.shared_capabilities().find_by_relative_offset(offset)
544                        {
545                            if cap == &this.primary.shared_cap {
546                                // delegate to primary
547                                let _ = this.primary.to_primary.send(msg);
548                            } else {
549                                // delegate to installed satellite if any
550                                for proto in &this.inner.protocols {
551                                    if proto.shared_cap == *cap {
552                                        proto.send_raw(msg);
553                                        break
554                                    }
555                                }
556                            }
557                        } else {
558                            return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(
559                                offset,
560                            )
561                            .into())))
562                        }
563                    }
564                    Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
565                    Poll::Ready(None) => {
566                        // connection closed
567                        return Poll::Ready(None)
568                    }
569                    Poll::Pending => break,
570                }
571            }
572
573            if !conn_ready || (!delegated && this.inner.out_buffer.is_empty()) {
574                return Poll::Pending
575            }
576        }
577    }
578}
579
580impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
581where
582    St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
583    Primary: Sink<T> + Unpin,
584    P2PStreamError: Into<<Primary as Sink<T>>::Error>,
585{
586    type Error = <Primary as Sink<T>>::Error;
587
588    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
589        let this = self.get_mut();
590        if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) {
591            return Poll::Ready(Err(err.into()))
592        }
593        if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) {
594            return Poll::Ready(Err(err))
595        }
596        Poll::Ready(Ok(()))
597    }
598
599    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
600        self.get_mut().primary.st.start_send_unpin(item)
601    }
602
603    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
604        self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into)
605    }
606
607    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
608        self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into)
609    }
610}
611
612/// Wraps a `RLPx` subprotocol and handles message ID multiplexing.
613struct ProtocolStream {
614    shared_cap: SharedCapability,
615    /// the channel shared with the satellite stream
616    to_satellite: UnboundedSender<BytesMut>,
617    satellite_st: Pin<Box<dyn Stream<Item = BytesMut> + Send>>,
618}
619
620impl ProtocolStream {
621    /// Masks the message ID of a message to be sent on the wire.
622    #[inline]
623    fn mask_msg_id(&self, mut msg: BytesMut) -> Result<Bytes, io::Error> {
624        if msg.is_empty() {
625            // message must not be empty
626            return Err(io::ErrorKind::InvalidInput.into())
627        }
628        msg[0] = msg[0]
629            .checked_add(self.shared_cap.relative_message_id_offset())
630            .ok_or(io::ErrorKind::InvalidInput)?;
631        Ok(msg.freeze())
632    }
633
634    /// Unmasks the message ID of a message received from the wire.
635    #[inline]
636    fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
637        if msg.is_empty() {
638            // message must not be empty
639            return Err(io::ErrorKind::InvalidInput.into())
640        }
641        msg[0] = msg[0]
642            .checked_sub(self.shared_cap.relative_message_id_offset())
643            .ok_or(io::ErrorKind::InvalidInput)?;
644        Ok(msg)
645    }
646
647    /// Sends the message to the satellite stream.
648    fn send_raw(&self, msg: BytesMut) {
649        let _ = self.unmask_id(msg).map(|msg| self.to_satellite.send(msg));
650    }
651}
652
653impl Stream for ProtocolStream {
654    type Item = Result<Bytes, io::Error>;
655
656    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
657        let this = self.get_mut();
658        let msg = ready!(this.satellite_st.as_mut().poll_next(cx));
659        Poll::Ready(msg.map(|msg| this.mask_msg_id(msg)))
660    }
661}
662
663impl fmt::Debug for ProtocolStream {
664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665        f.debug_struct("ProtocolStream").field("cap", &self.shared_cap).finish_non_exhaustive()
666    }
667}
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672    use crate::{
673        test_utils::{
674            connect_passthrough, eth_handshake, eth_hello,
675            proto::{test_hello, TestProtoMessage},
676        },
677        UnauthedP2PStream,
678    };
679    use reth_eth_wire_types::EthNetworkPrimitives;
680    use tokio::{net::TcpListener, sync::oneshot};
681    use tokio_util::codec::Decoder;
682
683    #[tokio::test]
684    async fn eth_satellite() {
685        reth_tracing::init_test_tracing();
686        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
687        let local_addr = listener.local_addr().unwrap();
688        let (status, fork_filter) = eth_handshake();
689        let other_status = status;
690        let other_fork_filter = fork_filter.clone();
691        let _handle = tokio::spawn(async move {
692            let (incoming, _) = listener.accept().await.unwrap();
693            let stream = crate::PassthroughCodec::default().framed(incoming);
694            let (server_hello, _) = eth_hello();
695            let (p2p_stream, _) =
696                UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
697
698            let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream)
699                .handshake::<EthNetworkPrimitives>(other_status, other_fork_filter)
700                .await
701                .unwrap();
702
703            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
704        });
705
706        let conn = connect_passthrough(local_addr, eth_hello().0).await;
707        let eth = conn.shared_capabilities().eth().unwrap().clone();
708
709        let multiplexer = RlpxProtocolMultiplexer::new(conn);
710        let _satellite = multiplexer
711            .into_satellite_stream_with_handshake(
712                eth.capability().as_ref(),
713                move |proxy| async move {
714                    UnauthedEthStream::new(proxy)
715                        .handshake::<EthNetworkPrimitives>(status, fork_filter)
716                        .await
717                },
718            )
719            .await
720            .unwrap();
721    }
722
723    /// A test that install a satellite stream eth+test protocol and sends messages between them.
724    #[tokio::test(flavor = "multi_thread")]
725    async fn eth_test_protocol_satellite() {
726        reth_tracing::init_test_tracing();
727        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
728        let local_addr = listener.local_addr().unwrap();
729        let (status, fork_filter) = eth_handshake();
730        let other_status = status;
731        let other_fork_filter = fork_filter.clone();
732        let _handle = tokio::spawn(async move {
733            let (incoming, _) = listener.accept().await.unwrap();
734            let stream = crate::PassthroughCodec::default().framed(incoming);
735            let (server_hello, _) = test_hello();
736            let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
737
738            let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
739                .into_eth_satellite_stream::<EthNetworkPrimitives>(other_status, other_fork_filter)
740                .await
741                .unwrap();
742
743            st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
744                async_stream::stream! {
745                    yield TestProtoMessage::ping().encoded();
746                    let msg = conn.next().await.unwrap();
747                    let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
748                    assert_eq!(msg, TestProtoMessage::pong());
749
750                    yield TestProtoMessage::message("hello").encoded();
751                    let msg = conn.next().await.unwrap();
752                    let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
753                    assert_eq!(msg, TestProtoMessage::message("good bye!"));
754
755                    yield TestProtoMessage::message("good bye!").encoded();
756
757                    futures::future::pending::<()>().await;
758                    unreachable!()
759                }
760            })
761            .unwrap();
762
763            loop {
764                let _ = st.next().await;
765            }
766        });
767
768        let conn = connect_passthrough(local_addr, test_hello().0).await;
769        let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
770            .into_eth_satellite_stream::<EthNetworkPrimitives>(status, fork_filter)
771            .await
772            .unwrap();
773
774        let (tx, mut rx) = oneshot::channel();
775
776        st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
777            async_stream::stream! {
778                let msg = conn.next().await.unwrap();
779                let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
780                assert_eq!(msg, TestProtoMessage::ping());
781
782                yield TestProtoMessage::pong().encoded();
783
784                let msg = conn.next().await.unwrap();
785                let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
786                assert_eq!(msg, TestProtoMessage::message("hello"));
787
788                yield TestProtoMessage::message("good bye!").encoded();
789
790                let msg = conn.next().await.unwrap();
791                let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
792                assert_eq!(msg, TestProtoMessage::message("good bye!"));
793
794                tx.send(()).unwrap();
795
796                futures::future::pending::<()>().await;
797                unreachable!()
798            }
799        })
800        .unwrap();
801
802        loop {
803            tokio::select! {
804                _ = &mut rx => {
805                    break
806                }
807               _ = st.next() => {
808                }
809            }
810        }
811    }
812}