reth_eth_wire/
ethstream.rs

1//! Ethereum protocol stream implementations.
2//!
3//! Provides stream types for the Ethereum wire protocol.
4//! It separates protocol logic [`EthStreamInner`] from transport concerns [`EthStream`].
5//! Handles handshaking, message processing, and RLP serialization.
6
7use crate::{
8    errors::{EthHandshakeError, EthStreamError},
9    handshake::EthereumEthHandshake,
10    message::{EthBroadcastMessage, ProtocolBroadcastMessage},
11    p2pstream::HANDSHAKE_TIMEOUT,
12    CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
13    UnifiedStatus,
14};
15use alloy_primitives::bytes::{Bytes, BytesMut};
16use alloy_rlp::Encodable;
17use futures::{ready, Sink, SinkExt};
18use pin_project::pin_project;
19use reth_eth_wire_types::{NetworkPrimitives, RawCapabilityMessage};
20use reth_ethereum_forks::ForkFilter;
21use std::{
22    future::Future,
23    pin::Pin,
24    task::{Context, Poll},
25    time::Duration,
26};
27use tokio::time::timeout;
28use tokio_stream::Stream;
29use tracing::{debug, trace};
30
31/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
32// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
33pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
34
35/// [`MAX_STATUS_SIZE`] is the maximum cap on the size of the initial status message
36pub(crate) const MAX_STATUS_SIZE: usize = 500 * 1024;
37
38/// An un-authenticated [`EthStream`]. This is consumed and returns a [`EthStream`] after the
39/// `Status` handshake is completed.
40#[pin_project]
41#[derive(Debug)]
42pub struct UnauthedEthStream<S> {
43    #[pin]
44    inner: S,
45}
46
47impl<S> UnauthedEthStream<S> {
48    /// Create a new `UnauthedEthStream` from a type `S` which implements `Stream` and `Sink`.
49    pub const fn new(inner: S) -> Self {
50        Self { inner }
51    }
52
53    /// Consumes the type and returns the wrapped stream
54    pub fn into_inner(self) -> S {
55        self.inner
56    }
57}
58
59impl<S, E> UnauthedEthStream<S>
60where
61    S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Send + Unpin,
62    EthStreamError: From<E> + From<<S as Sink<Bytes>>::Error>,
63{
64    /// Consumes the [`UnauthedEthStream`] and returns an [`EthStream`] after the `Status`
65    /// handshake is completed successfully. This also returns the `Status` message sent by the
66    /// remote peer.
67    pub async fn handshake<N: NetworkPrimitives>(
68        self,
69        status: UnifiedStatus,
70        fork_filter: ForkFilter,
71    ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
72        self.handshake_with_timeout(status, fork_filter, HANDSHAKE_TIMEOUT).await
73    }
74
75    /// Wrapper around handshake which enforces a timeout.
76    pub async fn handshake_with_timeout<N: NetworkPrimitives>(
77        self,
78        status: UnifiedStatus,
79        fork_filter: ForkFilter,
80        timeout_limit: Duration,
81    ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
82        timeout(timeout_limit, Self::handshake_without_timeout(self, status, fork_filter))
83            .await
84            .map_err(|_| EthStreamError::StreamTimeout)?
85    }
86
87    /// Handshake with no timeout
88    pub async fn handshake_without_timeout<N: NetworkPrimitives>(
89        mut self,
90        status: UnifiedStatus,
91        fork_filter: ForkFilter,
92    ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
93        trace!(
94            status = %status.into_message(),
95            "sending eth status to peer"
96        );
97        let their_status =
98            EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
99
100        // now we can create the `EthStream` because the peer has successfully completed
101        // the handshake
102        let stream = EthStream::new(status.version, self.inner);
103
104        Ok((stream, their_status))
105    }
106}
107
108/// Contains eth protocol specific logic for processing messages
109#[derive(Debug)]
110pub struct EthStreamInner<N> {
111    /// Negotiated eth version
112    version: EthVersion,
113    _pd: std::marker::PhantomData<N>,
114}
115
116impl<N> EthStreamInner<N>
117where
118    N: NetworkPrimitives,
119{
120    /// Creates a new [`EthStreamInner`] with the given eth version
121    pub const fn new(version: EthVersion) -> Self {
122        Self { version, _pd: std::marker::PhantomData }
123    }
124
125    /// Returns the eth version
126    #[inline]
127    pub const fn version(&self) -> EthVersion {
128        self.version
129    }
130
131    /// Decodes incoming bytes into an [`EthMessage`].
132    pub fn decode_message(&self, bytes: BytesMut) -> Result<EthMessage<N>, EthStreamError> {
133        if bytes.len() > MAX_MESSAGE_SIZE {
134            return Err(EthStreamError::MessageTooBig(bytes.len()));
135        }
136
137        let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) {
138            Ok(m) => m,
139            Err(err) => {
140                let msg = if bytes.len() > 50 {
141                    format!("{:02x?}...{:x?}", &bytes[..10], &bytes[bytes.len() - 10..])
142                } else {
143                    format!("{bytes:02x?}")
144                };
145                debug!(
146                    version=?self.version,
147                    %msg,
148                    "failed to decode protocol message"
149                );
150                return Err(EthStreamError::InvalidMessage(err));
151            }
152        };
153
154        if matches!(msg.message, EthMessage::Status(_)) {
155            return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
156        }
157
158        Ok(msg.message)
159    }
160
161    /// Encodes an [`EthMessage`] to bytes.
162    ///
163    /// Validates that Status messages are not sent after handshake, enforcing protocol rules.
164    pub fn encode_message(&self, item: EthMessage<N>) -> Result<Bytes, EthStreamError> {
165        if matches!(item, EthMessage::Status(_)) {
166            return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
167        }
168
169        Ok(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))
170    }
171}
172
173/// An `EthStream` wraps over any `Stream` that yields bytes and makes it
174/// compatible with eth-networking protocol messages, which get RLP encoded/decoded.
175#[pin_project]
176#[derive(Debug)]
177pub struct EthStream<S, N = EthNetworkPrimitives> {
178    /// Eth-specific logic
179    eth: EthStreamInner<N>,
180    #[pin]
181    inner: S,
182}
183
184impl<S, N: NetworkPrimitives> EthStream<S, N> {
185    /// Creates a new unauthed [`EthStream`] from a provided stream. You will need
186    /// to manually handshake a peer.
187    #[inline]
188    pub const fn new(version: EthVersion, inner: S) -> Self {
189        Self { eth: EthStreamInner::new(version), inner }
190    }
191
192    /// Returns the eth version.
193    #[inline]
194    pub const fn version(&self) -> EthVersion {
195        self.eth.version()
196    }
197
198    /// Returns the underlying stream.
199    #[inline]
200    pub const fn inner(&self) -> &S {
201        &self.inner
202    }
203
204    /// Returns mutable access to the underlying stream.
205    #[inline]
206    pub const fn inner_mut(&mut self) -> &mut S {
207        &mut self.inner
208    }
209
210    /// Consumes this type and returns the wrapped stream.
211    #[inline]
212    pub fn into_inner(self) -> S {
213        self.inner
214    }
215}
216
217impl<S, E, N> EthStream<S, N>
218where
219    S: Sink<Bytes, Error = E> + Unpin,
220    EthStreamError: From<E>,
221    N: NetworkPrimitives,
222{
223    /// Same as [`Sink::start_send`] but accepts a [`EthBroadcastMessage`] instead.
224    pub fn start_send_broadcast(
225        &mut self,
226        item: EthBroadcastMessage<N>,
227    ) -> Result<(), EthStreamError> {
228        self.inner.start_send_unpin(Bytes::from(alloy_rlp::encode(
229            ProtocolBroadcastMessage::from(item),
230        )))?;
231
232        Ok(())
233    }
234
235    /// Sends a raw capability message directly over the stream
236    pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
237        let mut bytes = Vec::with_capacity(msg.payload.len() + 1);
238        msg.id.encode(&mut bytes);
239        bytes.extend_from_slice(&msg.payload);
240
241        self.inner.start_send_unpin(bytes.into())?;
242        Ok(())
243    }
244}
245
246impl<S, E, N> Stream for EthStream<S, N>
247where
248    S: Stream<Item = Result<BytesMut, E>> + Unpin,
249    EthStreamError: From<E>,
250    N: NetworkPrimitives,
251{
252    type Item = Result<EthMessage<N>, EthStreamError>;
253
254    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255        let this = self.project();
256        let res = ready!(this.inner.poll_next(cx));
257
258        match res {
259            Some(Ok(bytes)) => Poll::Ready(Some(this.eth.decode_message(bytes))),
260            Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
261            None => Poll::Ready(None),
262        }
263    }
264}
265
266impl<S, N> Sink<EthMessage<N>> for EthStream<S, N>
267where
268    S: CanDisconnect<Bytes> + Unpin,
269    EthStreamError: From<<S as Sink<Bytes>>::Error>,
270    N: NetworkPrimitives,
271{
272    type Error = EthStreamError;
273
274    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
275        self.project().inner.poll_ready(cx).map_err(Into::into)
276    }
277
278    fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
279        if matches!(item, EthMessage::Status(_)) {
280            // TODO: to disconnect here we would need to do something similar to P2PStream's
281            // start_disconnect, which would ideally be a part of the CanDisconnect trait, or at
282            // least similar.
283            //
284            // Other parts of reth do not yet need traits like CanDisconnect because atm they work
285            // exclusively with EthStream<P2PStream<S>>, where the inner P2PStream is accessible,
286            // allowing for its start_disconnect method to be called.
287            //
288            // self.project().inner.start_disconnect(DisconnectReason::ProtocolBreach);
289            return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake))
290        }
291
292        self.project()
293            .inner
294            .start_send(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))?;
295
296        Ok(())
297    }
298
299    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
300        self.project().inner.poll_flush(cx).map_err(Into::into)
301    }
302
303    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
304        self.project().inner.poll_close(cx).map_err(Into::into)
305    }
306}
307
308impl<S, N> CanDisconnect<EthMessage<N>> for EthStream<S, N>
309where
310    S: CanDisconnect<Bytes> + Send,
311    EthStreamError: From<<S as Sink<Bytes>>::Error>,
312    N: NetworkPrimitives,
313{
314    fn disconnect(
315        &mut self,
316        reason: DisconnectReason,
317    ) -> Pin<Box<dyn Future<Output = Result<(), EthStreamError>> + Send + '_>> {
318        Box::pin(async move { self.inner.disconnect(reason).await.map_err(Into::into) })
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::UnauthedEthStream;
325    use crate::{
326        broadcast::BlockHashNumber,
327        errors::{EthHandshakeError, EthStreamError},
328        ethstream::RawCapabilityMessage,
329        hello::DEFAULT_TCP_PORT,
330        p2pstream::UnauthedP2PStream,
331        EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec,
332        ProtocolVersion, Status, StatusMessage,
333    };
334    use alloy_chains::NamedChain;
335    use alloy_primitives::{bytes::Bytes, B256, U256};
336    use alloy_rlp::Decodable;
337    use futures::{SinkExt, StreamExt};
338    use reth_ecies::stream::ECIESStream;
339    use reth_eth_wire_types::{EthNetworkPrimitives, UnifiedStatus};
340    use reth_ethereum_forks::{ForkFilter, Head};
341    use reth_network_peers::pk2id;
342    use secp256k1::{SecretKey, SECP256K1};
343    use std::time::Duration;
344    use tokio::net::{TcpListener, TcpStream};
345    use tokio_util::codec::Decoder;
346
347    #[tokio::test]
348    async fn can_handshake() {
349        let genesis = B256::random();
350        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
351
352        let status = Status {
353            version: EthVersion::Eth67,
354            chain: NamedChain::Mainnet.into(),
355            total_difficulty: U256::ZERO,
356            blockhash: B256::random(),
357            genesis,
358            // Pass the current fork id.
359            forkid: fork_filter.current(),
360        };
361        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
362
363        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
364        let local_addr = listener.local_addr().unwrap();
365
366        let status_clone = unified_status;
367        let fork_filter_clone = fork_filter.clone();
368        let handle = tokio::spawn(async move {
369            // roughly based off of the design of tokio::net::TcpListener
370            let (incoming, _) = listener.accept().await.unwrap();
371            let stream = PassthroughCodec::default().framed(incoming);
372            let (_, their_status) = UnauthedEthStream::new(stream)
373                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
374                .await
375                .unwrap();
376
377            // just make sure it equals our status (our status is a clone of their status)
378            assert_eq!(their_status, status_clone);
379        });
380
381        let outgoing = TcpStream::connect(local_addr).await.unwrap();
382        let sink = PassthroughCodec::default().framed(outgoing);
383
384        // try to connect
385        let (_, their_status) = UnauthedEthStream::new(sink)
386            .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
387            .await
388            .unwrap();
389
390        // their status is a clone of our status, these should be equal
391        assert_eq!(their_status, unified_status);
392
393        // wait for it to finish
394        handle.await.unwrap();
395    }
396
397    #[tokio::test]
398    async fn pass_handshake_on_low_td_bitlen() {
399        let genesis = B256::random();
400        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
401
402        let status = Status {
403            version: EthVersion::Eth67,
404            chain: NamedChain::Mainnet.into(),
405            total_difficulty: U256::from(2).pow(U256::from(100)) - U256::from(1),
406            blockhash: B256::random(),
407            genesis,
408            // Pass the current fork id.
409            forkid: fork_filter.current(),
410        };
411        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
412
413        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
414        let local_addr = listener.local_addr().unwrap();
415
416        let status_clone = unified_status;
417        let fork_filter_clone = fork_filter.clone();
418        let handle = tokio::spawn(async move {
419            // roughly based off of the design of tokio::net::TcpListener
420            let (incoming, _) = listener.accept().await.unwrap();
421            let stream = PassthroughCodec::default().framed(incoming);
422            let (_, their_status) = UnauthedEthStream::new(stream)
423                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
424                .await
425                .unwrap();
426
427            // just make sure it equals our status, and that the handshake succeeded
428            assert_eq!(their_status, status_clone);
429        });
430
431        let outgoing = TcpStream::connect(local_addr).await.unwrap();
432        let sink = PassthroughCodec::default().framed(outgoing);
433
434        // try to connect
435        let (_, their_status) = UnauthedEthStream::new(sink)
436            .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
437            .await
438            .unwrap();
439
440        // their status is a clone of our status, these should be equal
441        assert_eq!(their_status, unified_status);
442
443        // await the other handshake
444        handle.await.unwrap();
445    }
446
447    #[tokio::test]
448    async fn fail_handshake_on_high_td_bitlen() {
449        let genesis = B256::random();
450        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
451
452        let status = Status {
453            version: EthVersion::Eth67,
454            chain: NamedChain::Mainnet.into(),
455            total_difficulty: U256::from(2).pow(U256::from(164)),
456            blockhash: B256::random(),
457            genesis,
458            // Pass the current fork id.
459            forkid: fork_filter.current(),
460        };
461        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
462
463        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
464        let local_addr = listener.local_addr().unwrap();
465
466        let status_clone = unified_status;
467        let fork_filter_clone = fork_filter.clone();
468        let handle = tokio::spawn(async move {
469            // roughly based off of the design of tokio::net::TcpListener
470            let (incoming, _) = listener.accept().await.unwrap();
471            let stream = PassthroughCodec::default().framed(incoming);
472            let handshake_res = UnauthedEthStream::new(stream)
473                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
474                .await;
475
476            // make sure the handshake fails due to td too high
477            assert!(matches!(
478                handshake_res,
479                Err(EthStreamError::EthHandshakeError(
480                    EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
481                ))
482            ));
483        });
484
485        let outgoing = TcpStream::connect(local_addr).await.unwrap();
486        let sink = PassthroughCodec::default().framed(outgoing);
487
488        // try to connect
489        let handshake_res = UnauthedEthStream::new(sink)
490            .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
491            .await;
492
493        // this handshake should also fail due to td too high
494        assert!(matches!(
495            handshake_res,
496            Err(EthStreamError::EthHandshakeError(
497                EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
498            ))
499        ));
500
501        // await the other handshake
502        handle.await.unwrap();
503    }
504
505    #[tokio::test]
506    async fn can_write_and_read_cleartext() {
507        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
508        let local_addr = listener.local_addr().unwrap();
509        let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
510            vec![
511                BlockHashNumber { hash: B256::random(), number: 5 },
512                BlockHashNumber { hash: B256::random(), number: 6 },
513            ]
514            .into(),
515        );
516
517        let test_msg_clone = test_msg.clone();
518        let handle = tokio::spawn(async move {
519            // roughly based off of the design of tokio::net::TcpListener
520            let (incoming, _) = listener.accept().await.unwrap();
521            let stream = PassthroughCodec::default().framed(incoming);
522            let mut stream = EthStream::new(EthVersion::Eth67, stream);
523
524            // use the stream to get the next message
525            let message = stream.next().await.unwrap().unwrap();
526            assert_eq!(message, test_msg_clone);
527        });
528
529        let outgoing = TcpStream::connect(local_addr).await.unwrap();
530        let sink = PassthroughCodec::default().framed(outgoing);
531        let mut client_stream = EthStream::new(EthVersion::Eth67, sink);
532
533        client_stream.send(test_msg).await.unwrap();
534
535        // make sure the server receives the message and asserts before ending the test
536        handle.await.unwrap();
537    }
538
539    #[tokio::test]
540    async fn can_write_and_read_ecies() {
541        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
542        let local_addr = listener.local_addr().unwrap();
543        let server_key = SecretKey::new(&mut rand_08::thread_rng());
544        let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
545            vec![
546                BlockHashNumber { hash: B256::random(), number: 5 },
547                BlockHashNumber { hash: B256::random(), number: 6 },
548            ]
549            .into(),
550        );
551
552        let test_msg_clone = test_msg.clone();
553        let handle = tokio::spawn(async move {
554            // roughly based off of the design of tokio::net::TcpListener
555            let (incoming, _) = listener.accept().await.unwrap();
556            let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
557            let mut stream = EthStream::new(EthVersion::Eth67, stream);
558
559            // use the stream to get the next message
560            let message = stream.next().await.unwrap().unwrap();
561            assert_eq!(message, test_msg_clone);
562        });
563
564        // create the server pubkey
565        let server_id = pk2id(&server_key.public_key(SECP256K1));
566
567        let client_key = SecretKey::new(&mut rand_08::thread_rng());
568
569        let outgoing = TcpStream::connect(local_addr).await.unwrap();
570        let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
571        let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing);
572
573        client_stream.send(test_msg).await.unwrap();
574
575        // make sure the server receives the message and asserts before ending the test
576        handle.await.unwrap();
577    }
578
579    #[tokio::test(flavor = "multi_thread")]
580    async fn ethstream_over_p2p() {
581        // create a p2p stream and server, then confirm that the two are authed
582        // create tcpstream
583        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
584        let local_addr = listener.local_addr().unwrap();
585        let server_key = SecretKey::new(&mut rand_08::thread_rng());
586        let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
587            vec![
588                BlockHashNumber { hash: B256::random(), number: 5 },
589                BlockHashNumber { hash: B256::random(), number: 6 },
590            ]
591            .into(),
592        );
593
594        let genesis = B256::random();
595        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
596
597        let status = Status {
598            version: EthVersion::Eth67,
599            chain: NamedChain::Mainnet.into(),
600            total_difficulty: U256::ZERO,
601            blockhash: B256::random(),
602            genesis,
603            // Pass the current fork id.
604            forkid: fork_filter.current(),
605        };
606        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
607
608        let status_copy = unified_status;
609        let fork_filter_clone = fork_filter.clone();
610        let test_msg_clone = test_msg.clone();
611        let handle = tokio::spawn(async move {
612            // roughly based off of the design of tokio::net::TcpListener
613            let (incoming, _) = listener.accept().await.unwrap();
614            let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
615
616            let server_hello = HelloMessageWithProtocols {
617                protocol_version: ProtocolVersion::V5,
618                client_version: "bitcoind/1.0.0".to_string(),
619                protocols: vec![EthVersion::Eth67.into()],
620                port: DEFAULT_TCP_PORT,
621                id: pk2id(&server_key.public_key(SECP256K1)),
622            };
623
624            let unauthed_stream = UnauthedP2PStream::new(stream);
625            let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap();
626            let (mut eth_stream, _) = UnauthedEthStream::new(p2p_stream)
627                .handshake(status_copy, fork_filter_clone)
628                .await
629                .unwrap();
630
631            // use the stream to get the next message
632            let message = eth_stream.next().await.unwrap().unwrap();
633            assert_eq!(message, test_msg_clone);
634        });
635
636        // create the server pubkey
637        let server_id = pk2id(&server_key.public_key(SECP256K1));
638
639        let client_key = SecretKey::new(&mut rand_08::thread_rng());
640
641        let outgoing = TcpStream::connect(local_addr).await.unwrap();
642        let sink = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
643
644        let client_hello = HelloMessageWithProtocols {
645            protocol_version: ProtocolVersion::V5,
646            client_version: "bitcoind/1.0.0".to_string(),
647            protocols: vec![EthVersion::Eth67.into()],
648            port: DEFAULT_TCP_PORT,
649            id: pk2id(&client_key.public_key(SECP256K1)),
650        };
651
652        let unauthed_stream = UnauthedP2PStream::new(sink);
653        let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
654
655        let (mut client_stream, _) = UnauthedEthStream::new(p2p_stream)
656            .handshake(unified_status, fork_filter)
657            .await
658            .unwrap();
659
660        client_stream.send(test_msg).await.unwrap();
661
662        // make sure the server receives the message and asserts before ending the test
663        handle.await.unwrap();
664    }
665
666    #[tokio::test]
667    async fn handshake_should_timeout() {
668        let genesis = B256::random();
669        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
670
671        let status = Status {
672            version: EthVersion::Eth67,
673            chain: NamedChain::Mainnet.into(),
674            total_difficulty: U256::ZERO,
675            blockhash: B256::random(),
676            genesis,
677            // Pass the current fork id.
678            forkid: fork_filter.current(),
679        };
680        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
681
682        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
683        let local_addr = listener.local_addr().unwrap();
684
685        let status_clone = unified_status;
686        let fork_filter_clone = fork_filter.clone();
687        let _handle = tokio::spawn(async move {
688            // Delay accepting the connection for longer than the client's timeout period
689            tokio::time::sleep(Duration::from_secs(11)).await;
690            // roughly based off of the design of tokio::net::TcpListener
691            let (incoming, _) = listener.accept().await.unwrap();
692            let stream = PassthroughCodec::default().framed(incoming);
693            let (_, their_status) = UnauthedEthStream::new(stream)
694                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
695                .await
696                .unwrap();
697
698            // just make sure it equals our status (our status is a clone of their status)
699            assert_eq!(their_status, status_clone);
700        });
701
702        let outgoing = TcpStream::connect(local_addr).await.unwrap();
703        let sink = PassthroughCodec::default().framed(outgoing);
704
705        // try to connect
706        let handshake_result = UnauthedEthStream::new(sink)
707            .handshake_with_timeout::<EthNetworkPrimitives>(
708                unified_status,
709                fork_filter,
710                Duration::from_secs(1),
711            )
712            .await;
713
714        // Assert that a timeout error occurred
715        assert!(
716            matches!(handshake_result, Err(e) if e.to_string() == EthStreamError::StreamTimeout.to_string())
717        );
718    }
719
720    #[tokio::test]
721    async fn can_write_and_read_raw_capability() {
722        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
723        let local_addr = listener.local_addr().unwrap();
724
725        let test_msg = RawCapabilityMessage { id: 0x1234, payload: Bytes::from(vec![1, 2, 3, 4]) };
726
727        let test_msg_clone = test_msg.clone();
728        let handle = tokio::spawn(async move {
729            let (incoming, _) = listener.accept().await.unwrap();
730            let stream = PassthroughCodec::default().framed(incoming);
731            let mut stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, stream);
732
733            let bytes = stream.inner_mut().next().await.unwrap().unwrap();
734
735            // Create a cursor to track position while decoding
736            let mut id_bytes = &bytes[..];
737            let decoded_id = <usize as Decodable>::decode(&mut id_bytes).unwrap();
738            assert_eq!(decoded_id, test_msg_clone.id);
739
740            // Get remaining bytes after ID decoding
741            let remaining = id_bytes;
742            assert_eq!(remaining, &test_msg_clone.payload[..]);
743        });
744
745        let outgoing = TcpStream::connect(local_addr).await.unwrap();
746        let sink = PassthroughCodec::default().framed(outgoing);
747        let mut client_stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, sink);
748
749        client_stream.start_send_raw(test_msg).unwrap();
750        client_stream.inner_mut().flush().await.unwrap();
751
752        handle.await.unwrap();
753    }
754}