1use crate::{
8 errors::{EthHandshakeError, EthStreamError},
9 handshake::EthereumEthHandshake,
10 message::{EthBroadcastMessage, ProtocolBroadcastMessage},
11 p2pstream::HANDSHAKE_TIMEOUT,
12 CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
13 UnifiedStatus,
14};
15use alloy_primitives::bytes::{Bytes, BytesMut};
16use alloy_rlp::Encodable;
17use futures::{ready, Sink, SinkExt};
18use pin_project::pin_project;
19use reth_eth_wire_types::{NetworkPrimitives, RawCapabilityMessage};
20use reth_ethereum_forks::ForkFilter;
21use std::{
22 future::Future,
23 pin::Pin,
24 task::{Context, Poll},
25 time::Duration,
26};
27use tokio::time::timeout;
28use tokio_stream::Stream;
29use tracing::{debug, trace};
30
31pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
34
35pub(crate) const MAX_STATUS_SIZE: usize = 500 * 1024;
37
38#[pin_project]
41#[derive(Debug)]
42pub struct UnauthedEthStream<S> {
43 #[pin]
44 inner: S,
45}
46
47impl<S> UnauthedEthStream<S> {
48 pub const fn new(inner: S) -> Self {
50 Self { inner }
51 }
52
53 pub fn into_inner(self) -> S {
55 self.inner
56 }
57}
58
59impl<S, E> UnauthedEthStream<S>
60where
61 S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Send + Unpin,
62 EthStreamError: From<E> + From<<S as Sink<Bytes>>::Error>,
63{
64 pub async fn handshake<N: NetworkPrimitives>(
68 self,
69 status: UnifiedStatus,
70 fork_filter: ForkFilter,
71 ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
72 self.handshake_with_timeout(status, fork_filter, HANDSHAKE_TIMEOUT).await
73 }
74
75 pub async fn handshake_with_timeout<N: NetworkPrimitives>(
77 self,
78 status: UnifiedStatus,
79 fork_filter: ForkFilter,
80 timeout_limit: Duration,
81 ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
82 timeout(timeout_limit, Self::handshake_without_timeout(self, status, fork_filter))
83 .await
84 .map_err(|_| EthStreamError::StreamTimeout)?
85 }
86
87 pub async fn handshake_without_timeout<N: NetworkPrimitives>(
89 mut self,
90 status: UnifiedStatus,
91 fork_filter: ForkFilter,
92 ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
93 trace!(
94 status = %status.into_message(),
95 "sending eth status to peer"
96 );
97 let their_status =
98 EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
99
100 let stream = EthStream::new(status.version, self.inner);
103
104 Ok((stream, their_status))
105 }
106}
107
108#[derive(Debug)]
110pub struct EthStreamInner<N> {
111 version: EthVersion,
113 _pd: std::marker::PhantomData<N>,
114}
115
116impl<N> EthStreamInner<N>
117where
118 N: NetworkPrimitives,
119{
120 pub const fn new(version: EthVersion) -> Self {
122 Self { version, _pd: std::marker::PhantomData }
123 }
124
125 #[inline]
127 pub const fn version(&self) -> EthVersion {
128 self.version
129 }
130
131 pub fn decode_message(&self, bytes: BytesMut) -> Result<EthMessage<N>, EthStreamError> {
133 if bytes.len() > MAX_MESSAGE_SIZE {
134 return Err(EthStreamError::MessageTooBig(bytes.len()));
135 }
136
137 let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) {
138 Ok(m) => m,
139 Err(err) => {
140 let msg = if bytes.len() > 50 {
141 format!("{:02x?}...{:x?}", &bytes[..10], &bytes[bytes.len() - 10..])
142 } else {
143 format!("{bytes:02x?}")
144 };
145 debug!(
146 version=?self.version,
147 %msg,
148 "failed to decode protocol message"
149 );
150 return Err(EthStreamError::InvalidMessage(err));
151 }
152 };
153
154 if matches!(msg.message, EthMessage::Status(_)) {
155 return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
156 }
157
158 Ok(msg.message)
159 }
160
161 pub fn encode_message(&self, item: EthMessage<N>) -> Result<Bytes, EthStreamError> {
165 if matches!(item, EthMessage::Status(_)) {
166 return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
167 }
168
169 Ok(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))
170 }
171}
172
173#[pin_project]
176#[derive(Debug)]
177pub struct EthStream<S, N = EthNetworkPrimitives> {
178 eth: EthStreamInner<N>,
180 #[pin]
181 inner: S,
182}
183
184impl<S, N: NetworkPrimitives> EthStream<S, N> {
185 #[inline]
188 pub const fn new(version: EthVersion, inner: S) -> Self {
189 Self { eth: EthStreamInner::new(version), inner }
190 }
191
192 #[inline]
194 pub const fn version(&self) -> EthVersion {
195 self.eth.version()
196 }
197
198 #[inline]
200 pub const fn inner(&self) -> &S {
201 &self.inner
202 }
203
204 #[inline]
206 pub const fn inner_mut(&mut self) -> &mut S {
207 &mut self.inner
208 }
209
210 #[inline]
212 pub fn into_inner(self) -> S {
213 self.inner
214 }
215}
216
217impl<S, E, N> EthStream<S, N>
218where
219 S: Sink<Bytes, Error = E> + Unpin,
220 EthStreamError: From<E>,
221 N: NetworkPrimitives,
222{
223 pub fn start_send_broadcast(
225 &mut self,
226 item: EthBroadcastMessage<N>,
227 ) -> Result<(), EthStreamError> {
228 self.inner.start_send_unpin(Bytes::from(alloy_rlp::encode(
229 ProtocolBroadcastMessage::from(item),
230 )))?;
231
232 Ok(())
233 }
234
235 pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
237 let mut bytes = Vec::with_capacity(msg.payload.len() + 1);
238 msg.id.encode(&mut bytes);
239 bytes.extend_from_slice(&msg.payload);
240
241 self.inner.start_send_unpin(bytes.into())?;
242 Ok(())
243 }
244}
245
246impl<S, E, N> Stream for EthStream<S, N>
247where
248 S: Stream<Item = Result<BytesMut, E>> + Unpin,
249 EthStreamError: From<E>,
250 N: NetworkPrimitives,
251{
252 type Item = Result<EthMessage<N>, EthStreamError>;
253
254 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255 let this = self.project();
256 let res = ready!(this.inner.poll_next(cx));
257
258 match res {
259 Some(Ok(bytes)) => Poll::Ready(Some(this.eth.decode_message(bytes))),
260 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
261 None => Poll::Ready(None),
262 }
263 }
264}
265
266impl<S, N> Sink<EthMessage<N>> for EthStream<S, N>
267where
268 S: CanDisconnect<Bytes> + Unpin,
269 EthStreamError: From<<S as Sink<Bytes>>::Error>,
270 N: NetworkPrimitives,
271{
272 type Error = EthStreamError;
273
274 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
275 self.project().inner.poll_ready(cx).map_err(Into::into)
276 }
277
278 fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
279 if matches!(item, EthMessage::Status(_)) {
280 return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake))
290 }
291
292 self.project()
293 .inner
294 .start_send(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))?;
295
296 Ok(())
297 }
298
299 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
300 self.project().inner.poll_flush(cx).map_err(Into::into)
301 }
302
303 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
304 self.project().inner.poll_close(cx).map_err(Into::into)
305 }
306}
307
308impl<S, N> CanDisconnect<EthMessage<N>> for EthStream<S, N>
309where
310 S: CanDisconnect<Bytes> + Send,
311 EthStreamError: From<<S as Sink<Bytes>>::Error>,
312 N: NetworkPrimitives,
313{
314 fn disconnect(
315 &mut self,
316 reason: DisconnectReason,
317 ) -> Pin<Box<dyn Future<Output = Result<(), EthStreamError>> + Send + '_>> {
318 Box::pin(async move { self.inner.disconnect(reason).await.map_err(Into::into) })
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::UnauthedEthStream;
325 use crate::{
326 broadcast::BlockHashNumber,
327 errors::{EthHandshakeError, EthStreamError},
328 ethstream::RawCapabilityMessage,
329 hello::DEFAULT_TCP_PORT,
330 p2pstream::UnauthedP2PStream,
331 EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec,
332 ProtocolVersion, Status, StatusMessage,
333 };
334 use alloy_chains::NamedChain;
335 use alloy_primitives::{bytes::Bytes, B256, U256};
336 use alloy_rlp::Decodable;
337 use futures::{SinkExt, StreamExt};
338 use reth_ecies::stream::ECIESStream;
339 use reth_eth_wire_types::{EthNetworkPrimitives, UnifiedStatus};
340 use reth_ethereum_forks::{ForkFilter, Head};
341 use reth_network_peers::pk2id;
342 use secp256k1::{SecretKey, SECP256K1};
343 use std::time::Duration;
344 use tokio::net::{TcpListener, TcpStream};
345 use tokio_util::codec::Decoder;
346
347 #[tokio::test]
348 async fn can_handshake() {
349 let genesis = B256::random();
350 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
351
352 let status = Status {
353 version: EthVersion::Eth67,
354 chain: NamedChain::Mainnet.into(),
355 total_difficulty: U256::ZERO,
356 blockhash: B256::random(),
357 genesis,
358 forkid: fork_filter.current(),
360 };
361 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
362
363 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
364 let local_addr = listener.local_addr().unwrap();
365
366 let status_clone = unified_status;
367 let fork_filter_clone = fork_filter.clone();
368 let handle = tokio::spawn(async move {
369 let (incoming, _) = listener.accept().await.unwrap();
371 let stream = PassthroughCodec::default().framed(incoming);
372 let (_, their_status) = UnauthedEthStream::new(stream)
373 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
374 .await
375 .unwrap();
376
377 assert_eq!(their_status, status_clone);
379 });
380
381 let outgoing = TcpStream::connect(local_addr).await.unwrap();
382 let sink = PassthroughCodec::default().framed(outgoing);
383
384 let (_, their_status) = UnauthedEthStream::new(sink)
386 .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
387 .await
388 .unwrap();
389
390 assert_eq!(their_status, unified_status);
392
393 handle.await.unwrap();
395 }
396
397 #[tokio::test]
398 async fn pass_handshake_on_low_td_bitlen() {
399 let genesis = B256::random();
400 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
401
402 let status = Status {
403 version: EthVersion::Eth67,
404 chain: NamedChain::Mainnet.into(),
405 total_difficulty: U256::from(2).pow(U256::from(100)) - U256::from(1),
406 blockhash: B256::random(),
407 genesis,
408 forkid: fork_filter.current(),
410 };
411 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
412
413 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
414 let local_addr = listener.local_addr().unwrap();
415
416 let status_clone = unified_status;
417 let fork_filter_clone = fork_filter.clone();
418 let handle = tokio::spawn(async move {
419 let (incoming, _) = listener.accept().await.unwrap();
421 let stream = PassthroughCodec::default().framed(incoming);
422 let (_, their_status) = UnauthedEthStream::new(stream)
423 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
424 .await
425 .unwrap();
426
427 assert_eq!(their_status, status_clone);
429 });
430
431 let outgoing = TcpStream::connect(local_addr).await.unwrap();
432 let sink = PassthroughCodec::default().framed(outgoing);
433
434 let (_, their_status) = UnauthedEthStream::new(sink)
436 .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
437 .await
438 .unwrap();
439
440 assert_eq!(their_status, unified_status);
442
443 handle.await.unwrap();
445 }
446
447 #[tokio::test]
448 async fn fail_handshake_on_high_td_bitlen() {
449 let genesis = B256::random();
450 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
451
452 let status = Status {
453 version: EthVersion::Eth67,
454 chain: NamedChain::Mainnet.into(),
455 total_difficulty: U256::from(2).pow(U256::from(164)),
456 blockhash: B256::random(),
457 genesis,
458 forkid: fork_filter.current(),
460 };
461 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
462
463 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
464 let local_addr = listener.local_addr().unwrap();
465
466 let status_clone = unified_status;
467 let fork_filter_clone = fork_filter.clone();
468 let handle = tokio::spawn(async move {
469 let (incoming, _) = listener.accept().await.unwrap();
471 let stream = PassthroughCodec::default().framed(incoming);
472 let handshake_res = UnauthedEthStream::new(stream)
473 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
474 .await;
475
476 assert!(matches!(
478 handshake_res,
479 Err(EthStreamError::EthHandshakeError(
480 EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
481 ))
482 ));
483 });
484
485 let outgoing = TcpStream::connect(local_addr).await.unwrap();
486 let sink = PassthroughCodec::default().framed(outgoing);
487
488 let handshake_res = UnauthedEthStream::new(sink)
490 .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
491 .await;
492
493 assert!(matches!(
495 handshake_res,
496 Err(EthStreamError::EthHandshakeError(
497 EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
498 ))
499 ));
500
501 handle.await.unwrap();
503 }
504
505 #[tokio::test]
506 async fn can_write_and_read_cleartext() {
507 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
508 let local_addr = listener.local_addr().unwrap();
509 let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
510 vec![
511 BlockHashNumber { hash: B256::random(), number: 5 },
512 BlockHashNumber { hash: B256::random(), number: 6 },
513 ]
514 .into(),
515 );
516
517 let test_msg_clone = test_msg.clone();
518 let handle = tokio::spawn(async move {
519 let (incoming, _) = listener.accept().await.unwrap();
521 let stream = PassthroughCodec::default().framed(incoming);
522 let mut stream = EthStream::new(EthVersion::Eth67, stream);
523
524 let message = stream.next().await.unwrap().unwrap();
526 assert_eq!(message, test_msg_clone);
527 });
528
529 let outgoing = TcpStream::connect(local_addr).await.unwrap();
530 let sink = PassthroughCodec::default().framed(outgoing);
531 let mut client_stream = EthStream::new(EthVersion::Eth67, sink);
532
533 client_stream.send(test_msg).await.unwrap();
534
535 handle.await.unwrap();
537 }
538
539 #[tokio::test]
540 async fn can_write_and_read_ecies() {
541 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
542 let local_addr = listener.local_addr().unwrap();
543 let server_key = SecretKey::new(&mut rand_08::thread_rng());
544 let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
545 vec![
546 BlockHashNumber { hash: B256::random(), number: 5 },
547 BlockHashNumber { hash: B256::random(), number: 6 },
548 ]
549 .into(),
550 );
551
552 let test_msg_clone = test_msg.clone();
553 let handle = tokio::spawn(async move {
554 let (incoming, _) = listener.accept().await.unwrap();
556 let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
557 let mut stream = EthStream::new(EthVersion::Eth67, stream);
558
559 let message = stream.next().await.unwrap().unwrap();
561 assert_eq!(message, test_msg_clone);
562 });
563
564 let server_id = pk2id(&server_key.public_key(SECP256K1));
566
567 let client_key = SecretKey::new(&mut rand_08::thread_rng());
568
569 let outgoing = TcpStream::connect(local_addr).await.unwrap();
570 let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
571 let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing);
572
573 client_stream.send(test_msg).await.unwrap();
574
575 handle.await.unwrap();
577 }
578
579 #[tokio::test(flavor = "multi_thread")]
580 async fn ethstream_over_p2p() {
581 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
584 let local_addr = listener.local_addr().unwrap();
585 let server_key = SecretKey::new(&mut rand_08::thread_rng());
586 let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
587 vec![
588 BlockHashNumber { hash: B256::random(), number: 5 },
589 BlockHashNumber { hash: B256::random(), number: 6 },
590 ]
591 .into(),
592 );
593
594 let genesis = B256::random();
595 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
596
597 let status = Status {
598 version: EthVersion::Eth67,
599 chain: NamedChain::Mainnet.into(),
600 total_difficulty: U256::ZERO,
601 blockhash: B256::random(),
602 genesis,
603 forkid: fork_filter.current(),
605 };
606 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
607
608 let status_copy = unified_status;
609 let fork_filter_clone = fork_filter.clone();
610 let test_msg_clone = test_msg.clone();
611 let handle = tokio::spawn(async move {
612 let (incoming, _) = listener.accept().await.unwrap();
614 let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
615
616 let server_hello = HelloMessageWithProtocols {
617 protocol_version: ProtocolVersion::V5,
618 client_version: "bitcoind/1.0.0".to_string(),
619 protocols: vec![EthVersion::Eth67.into()],
620 port: DEFAULT_TCP_PORT,
621 id: pk2id(&server_key.public_key(SECP256K1)),
622 };
623
624 let unauthed_stream = UnauthedP2PStream::new(stream);
625 let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap();
626 let (mut eth_stream, _) = UnauthedEthStream::new(p2p_stream)
627 .handshake(status_copy, fork_filter_clone)
628 .await
629 .unwrap();
630
631 let message = eth_stream.next().await.unwrap().unwrap();
633 assert_eq!(message, test_msg_clone);
634 });
635
636 let server_id = pk2id(&server_key.public_key(SECP256K1));
638
639 let client_key = SecretKey::new(&mut rand_08::thread_rng());
640
641 let outgoing = TcpStream::connect(local_addr).await.unwrap();
642 let sink = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
643
644 let client_hello = HelloMessageWithProtocols {
645 protocol_version: ProtocolVersion::V5,
646 client_version: "bitcoind/1.0.0".to_string(),
647 protocols: vec![EthVersion::Eth67.into()],
648 port: DEFAULT_TCP_PORT,
649 id: pk2id(&client_key.public_key(SECP256K1)),
650 };
651
652 let unauthed_stream = UnauthedP2PStream::new(sink);
653 let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
654
655 let (mut client_stream, _) = UnauthedEthStream::new(p2p_stream)
656 .handshake(unified_status, fork_filter)
657 .await
658 .unwrap();
659
660 client_stream.send(test_msg).await.unwrap();
661
662 handle.await.unwrap();
664 }
665
666 #[tokio::test]
667 async fn handshake_should_timeout() {
668 let genesis = B256::random();
669 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
670
671 let status = Status {
672 version: EthVersion::Eth67,
673 chain: NamedChain::Mainnet.into(),
674 total_difficulty: U256::ZERO,
675 blockhash: B256::random(),
676 genesis,
677 forkid: fork_filter.current(),
679 };
680 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
681
682 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
683 let local_addr = listener.local_addr().unwrap();
684
685 let status_clone = unified_status;
686 let fork_filter_clone = fork_filter.clone();
687 let _handle = tokio::spawn(async move {
688 tokio::time::sleep(Duration::from_secs(11)).await;
690 let (incoming, _) = listener.accept().await.unwrap();
692 let stream = PassthroughCodec::default().framed(incoming);
693 let (_, their_status) = UnauthedEthStream::new(stream)
694 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
695 .await
696 .unwrap();
697
698 assert_eq!(their_status, status_clone);
700 });
701
702 let outgoing = TcpStream::connect(local_addr).await.unwrap();
703 let sink = PassthroughCodec::default().framed(outgoing);
704
705 let handshake_result = UnauthedEthStream::new(sink)
707 .handshake_with_timeout::<EthNetworkPrimitives>(
708 unified_status,
709 fork_filter,
710 Duration::from_secs(1),
711 )
712 .await;
713
714 assert!(
716 matches!(handshake_result, Err(e) if e.to_string() == EthStreamError::StreamTimeout.to_string())
717 );
718 }
719
720 #[tokio::test]
721 async fn can_write_and_read_raw_capability() {
722 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
723 let local_addr = listener.local_addr().unwrap();
724
725 let test_msg = RawCapabilityMessage { id: 0x1234, payload: Bytes::from(vec![1, 2, 3, 4]) };
726
727 let test_msg_clone = test_msg.clone();
728 let handle = tokio::spawn(async move {
729 let (incoming, _) = listener.accept().await.unwrap();
730 let stream = PassthroughCodec::default().framed(incoming);
731 let mut stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, stream);
732
733 let bytes = stream.inner_mut().next().await.unwrap().unwrap();
734
735 let mut id_bytes = &bytes[..];
737 let decoded_id = <usize as Decodable>::decode(&mut id_bytes).unwrap();
738 assert_eq!(decoded_id, test_msg_clone.id);
739
740 let remaining = id_bytes;
742 assert_eq!(remaining, &test_msg_clone.payload[..]);
743 });
744
745 let outgoing = TcpStream::connect(local_addr).await.unwrap();
746 let sink = PassthroughCodec::default().framed(outgoing);
747 let mut client_stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, sink);
748
749 client_stream.start_send_raw(test_msg).unwrap();
750 client_stream.inner_mut().flush().await.unwrap();
751
752 handle.await.unwrap();
753 }
754}