1mod active;
4mod conn;
5mod counter;
6mod handle;
7
8use active::QueuedOutgoingMessages;
9pub use conn::EthRlpxConnection;
10pub use handle::{
11 ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
12 SessionCommand,
13};
14
15pub use reth_network_api::{Direction, PeerInfo};
16
17use std::{
18 collections::HashMap,
19 future::Future,
20 net::SocketAddr,
21 sync::{atomic::AtomicU64, Arc},
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use crate::{
27 message::PeerMessage,
28 metrics::SessionManagerMetrics,
29 protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
30 session::active::ActiveSession,
31};
32use counter::SessionCounter;
33use futures::{future::Either, io, FutureExt, StreamExt};
34use reth_ecies::{stream::ECIESStream, ECIESError};
35use reth_eth_wire::{
36 errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
37 BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
38 HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
39 HANDSHAKE_TIMEOUT,
40};
41use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
42use reth_metrics::common::mpsc::MeteredPollSender;
43use reth_network_api::{PeerRequest, PeerRequestSender};
44use reth_network_peers::PeerId;
45use reth_network_types::SessionsConfig;
46use reth_tasks::TaskSpawner;
47use rustc_hash::FxHashMap;
48use secp256k1::SecretKey;
49use tokio::{
50 io::{AsyncRead, AsyncWrite},
51 net::TcpStream,
52 sync::{mpsc, mpsc::error::TrySendError, oneshot},
53};
54use tokio_stream::wrappers::ReceiverStream;
55use tokio_util::sync::PollSender;
56use tracing::{debug, instrument, trace};
57
58#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
60pub struct SessionId(usize);
61
62#[must_use = "Session Manager must be polled to process session events."]
64#[derive(Debug)]
65pub struct SessionManager<N: NetworkPrimitives> {
66 next_id: usize,
68 counter: SessionCounter,
70 initial_internal_request_timeout: Duration,
73 protocol_breach_request_timeout: Duration,
76 pending_session_timeout: Duration,
78 secret_key: SecretKey,
80 status: UnifiedStatus,
82 hello_message: HelloMessageWithProtocols,
84 fork_filter: ForkFilter,
86 session_command_buffer: usize,
88 executor: Box<dyn TaskSpawner>,
90 pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
95 active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
97 pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
102 pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
104 active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
109 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
111 extra_protocols: RlpxSubProtocols,
113 disconnections_counter: DisconnectionsCounter,
115 metrics: SessionManagerMetrics,
117 handshake: Arc<dyn EthRlpxHandshake>,
119}
120
121impl<N: NetworkPrimitives> SessionManager<N> {
124 #[expect(clippy::too_many_arguments)]
126 pub fn new(
127 secret_key: SecretKey,
128 config: SessionsConfig,
129 executor: Box<dyn TaskSpawner>,
130 status: UnifiedStatus,
131 hello_message: HelloMessageWithProtocols,
132 fork_filter: ForkFilter,
133 extra_protocols: RlpxSubProtocols,
134 handshake: Arc<dyn EthRlpxHandshake>,
135 ) -> Self {
136 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
137 let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
138 let active_session_tx = PollSender::new(active_session_tx);
139
140 Self {
141 next_id: 0,
142 counter: SessionCounter::new(config.limits),
143 initial_internal_request_timeout: config.initial_internal_request_timeout,
144 protocol_breach_request_timeout: config.protocol_breach_request_timeout,
145 pending_session_timeout: config.pending_session_timeout,
146 secret_key,
147 status,
148 hello_message,
149 fork_filter,
150 session_command_buffer: config.session_command_buffer,
151 executor,
152 pending_sessions: Default::default(),
153 active_sessions: Default::default(),
154 pending_sessions_tx,
155 pending_session_rx: ReceiverStream::new(pending_sessions_rx),
156 active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
157 active_session_rx: ReceiverStream::new(active_session_rx),
158 extra_protocols,
159 disconnections_counter: Default::default(),
160 metrics: Default::default(),
161 handshake,
162 }
163 }
164
165 pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
168 self.fork_filter.validate(fork_id).is_ok()
169 }
170
171 const fn next_id(&mut self) -> SessionId {
173 let id = self.next_id;
174 self.next_id += 1;
175 SessionId(id)
176 }
177
178 pub const fn status(&self) -> UnifiedStatus {
180 self.status
181 }
182
183 pub const fn secret_key(&self) -> SecretKey {
185 self.secret_key
186 }
187
188 pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
190 &self.active_sessions
191 }
192
193 pub fn hello_message(&self) -> HelloMessageWithProtocols {
195 self.hello_message.clone()
196 }
197
198 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
200 self.extra_protocols.push(protocol)
201 }
202
203 #[inline]
205 pub(crate) fn num_pending_connections(&self) -> usize {
206 self.pending_sessions.len()
207 }
208
209 fn spawn<F>(&self, f: F)
212 where
213 F: Future<Output = ()> + Send + 'static,
214 {
215 self.executor.spawn(f.boxed());
216 }
217
218 pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
223 self.status.blockhash = head.hash;
224 self.status.total_difficulty = Some(head.total_difficulty);
225 let transition = self.fork_filter.set_head(head);
226 self.status.forkid = self.fork_filter.current();
227 self.status.latest_block = Some(head.number);
228
229 transition
230 }
231
232 pub(crate) fn on_incoming(
237 &mut self,
238 stream: TcpStream,
239 remote_addr: SocketAddr,
240 ) -> Result<SessionId, ExceedsSessionLimit> {
241 self.counter.ensure_pending_inbound()?;
242
243 let session_id = self.next_id();
244
245 trace!(
246 target: "net::session",
247 ?remote_addr,
248 ?session_id,
249 "new pending incoming session"
250 );
251
252 let (disconnect_tx, disconnect_rx) = oneshot::channel();
253 let pending_events = self.pending_sessions_tx.clone();
254 let secret_key = self.secret_key;
255 let hello_message = self.hello_message.clone();
256 let status = self.status;
257 let fork_filter = self.fork_filter.clone();
258 let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
259 self.spawn(pending_session_with_timeout(
260 self.pending_session_timeout,
261 session_id,
262 remote_addr,
263 Direction::Incoming,
264 pending_events.clone(),
265 start_pending_incoming_session(
266 self.handshake.clone(),
267 disconnect_rx,
268 session_id,
269 stream,
270 pending_events,
271 remote_addr,
272 secret_key,
273 hello_message,
274 status,
275 fork_filter,
276 extra_handlers,
277 ),
278 ));
279
280 let handle = PendingSessionHandle {
281 disconnect_tx: Some(disconnect_tx),
282 direction: Direction::Incoming,
283 };
284 self.pending_sessions.insert(session_id, handle);
285 self.counter.inc_pending_inbound();
286 Ok(session_id)
287 }
288
289 pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
291 if self.counter.ensure_pending_outbound().is_ok() {
293 let session_id = self.next_id();
294 let (disconnect_tx, disconnect_rx) = oneshot::channel();
295 let pending_events = self.pending_sessions_tx.clone();
296 let secret_key = self.secret_key;
297 let hello_message = self.hello_message.clone();
298 let fork_filter = self.fork_filter.clone();
299 let status = self.status;
300 let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
301 self.spawn(pending_session_with_timeout(
302 self.pending_session_timeout,
303 session_id,
304 remote_addr,
305 Direction::Outgoing(remote_peer_id),
306 pending_events.clone(),
307 start_pending_outbound_session(
308 self.handshake.clone(),
309 disconnect_rx,
310 pending_events,
311 session_id,
312 remote_addr,
313 remote_peer_id,
314 secret_key,
315 hello_message,
316 status,
317 fork_filter,
318 extra_handlers,
319 ),
320 ));
321
322 let handle = PendingSessionHandle {
323 disconnect_tx: Some(disconnect_tx),
324 direction: Direction::Outgoing(remote_peer_id),
325 };
326 self.pending_sessions.insert(session_id, handle);
327 self.counter.inc_pending_outbound();
328 }
329 }
330
331 pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
336 if let Some(session) = self.active_sessions.get(&node) {
337 session.disconnect(reason);
338 }
339 }
340
341 pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
346 for session in self.active_sessions.values() {
347 session.disconnect(reason);
348 }
349 }
350
351 pub fn disconnect_all_pending(&mut self) {
353 for session in self.pending_sessions.values_mut() {
354 session.disconnect();
355 }
356 }
357
358 pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
360 if let Some(session) = self.active_sessions.get(peer_id) {
361 let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
362 |e| {
363 if let TrySendError::Full(_) = e {
364 debug!(
365 target: "net::session",
366 ?peer_id,
367 "session command buffer full, dropping message"
368 );
369 self.metrics.total_outgoing_peer_messages_dropped.increment(1);
370 }
371 },
372 );
373 }
374 }
375
376 fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
378 let session = self.pending_sessions.remove(id)?;
379 self.counter.dec_pending(&session.direction);
380 Some(session)
381 }
382
383 fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
385 let session = self.active_sessions.remove(id)?;
386 self.counter.dec_active(&session.direction);
387 Some(session)
388 }
389
390 pub(crate) fn try_disconnect_incoming_connection(
394 &self,
395 stream: TcpStream,
396 reason: DisconnectReason,
397 ) {
398 if !self.disconnections_counter.has_capacity() {
399 return
401 }
402
403 let guard = self.disconnections_counter.clone();
404 let secret_key = self.secret_key;
405
406 self.spawn(async move {
407 trace!(
408 target: "net::session",
409 "gracefully disconnecting incoming connection"
410 );
411 if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
412 let mut unauth = UnauthedP2PStream::new(stream);
413 let _ = unauth.send_disconnect(reason).await;
414 drop(guard);
415 }
416 });
417 }
418
419 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
423 match self.active_session_rx.poll_next_unpin(cx) {
425 Poll::Pending => {}
426 Poll::Ready(None) => {
427 unreachable!("Manager holds both channel halves.")
428 }
429 Poll::Ready(Some(event)) => {
430 return match event {
431 ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
432 trace!(
433 target: "net::session",
434 ?peer_id,
435 "gracefully disconnected active session."
436 );
437 self.remove_active_session(&peer_id);
438 Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
439 }
440 ActiveSessionMessage::ClosedOnConnectionError {
441 peer_id,
442 remote_addr,
443 error,
444 } => {
445 trace!(target: "net::session", ?peer_id, %error,"closed session.");
446 self.remove_active_session(&peer_id);
447 Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
448 remote_addr,
449 peer_id,
450 error,
451 })
452 }
453 ActiveSessionMessage::ValidMessage { peer_id, message } => {
454 Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
455 }
456 ActiveSessionMessage::BadMessage { peer_id } => {
457 Poll::Ready(SessionEvent::BadMessage { peer_id })
458 }
459 ActiveSessionMessage::ProtocolBreach { peer_id } => {
460 Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
461 }
462 }
463 }
464 }
465
466 let event = match self.pending_session_rx.poll_next_unpin(cx) {
468 Poll::Pending => return Poll::Pending,
469 Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
470 Poll::Ready(Some(event)) => event,
471 };
472 match event {
473 PendingSessionEvent::Established {
474 session_id,
475 remote_addr,
476 local_addr,
477 peer_id,
478 capabilities,
479 conn,
480 status,
481 direction,
482 client_id,
483 } => {
484 self.remove_pending_session(&session_id);
486
487 if self.active_sessions.contains_key(&peer_id) {
489 trace!(
490 target: "net::session",
491 ?session_id,
492 ?remote_addr,
493 ?peer_id,
494 ?direction,
495 "already connected"
496 );
497
498 self.spawn(async move {
499 let _ =
501 conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
502 });
503
504 return Poll::Ready(SessionEvent::AlreadyConnected {
505 peer_id,
506 remote_addr,
507 direction,
508 })
509 }
510
511 let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
512
513 let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
514
515 let messages = PeerRequestSender::new(peer_id, to_session_tx);
516
517 let timeout = Arc::new(AtomicU64::new(
518 self.initial_internal_request_timeout.as_millis() as u64,
519 ));
520
521 let version = conn.version();
523
524 let session = ActiveSession {
525 next_id: 0,
526 remote_peer_id: peer_id,
527 remote_addr,
528 remote_capabilities: Arc::clone(&capabilities),
529 session_id,
530 commands_rx: ReceiverStream::new(commands_rx),
531 to_session_manager: self.active_session_tx.clone(),
532 pending_message_to_session: None,
533 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
534 inflight_requests: Default::default(),
535 conn,
536 queued_outgoing: QueuedOutgoingMessages::new(
537 self.metrics.queued_outgoing_messages.clone(),
538 ),
539 received_requests_from_remote: Default::default(),
540 internal_request_timeout_interval: tokio::time::interval(
541 self.initial_internal_request_timeout,
542 ),
543 internal_request_timeout: Arc::clone(&timeout),
544 protocol_breach_request_timeout: self.protocol_breach_request_timeout,
545 terminate_message: None,
546 };
547
548 self.spawn(session);
549
550 let client_version = client_id.into();
551 let handle = ActiveSessionHandle {
552 status: status.clone(),
553 direction,
554 session_id,
555 remote_id: peer_id,
556 version,
557 established: Instant::now(),
558 capabilities: Arc::clone(&capabilities),
559 commands_to_session,
560 client_version: Arc::clone(&client_version),
561 remote_addr,
562 local_addr,
563 };
564
565 self.active_sessions.insert(peer_id, handle);
566 self.counter.inc_active(&direction);
567
568 if direction.is_outgoing() {
569 self.metrics.total_dial_successes.increment(1);
570 }
571
572 Poll::Ready(SessionEvent::SessionEstablished {
573 peer_id,
574 remote_addr,
575 client_version,
576 version,
577 capabilities,
578 status,
579 messages,
580 direction,
581 timeout,
582 })
583 }
584 PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
585 trace!(
586 target: "net::session",
587 ?session_id,
588 ?remote_addr,
589 ?error,
590 "disconnected pending session"
591 );
592 self.remove_pending_session(&session_id);
593 match direction {
594 Direction::Incoming => {
595 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
596 remote_addr,
597 error,
598 })
599 }
600 Direction::Outgoing(peer_id) => {
601 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
602 remote_addr,
603 peer_id,
604 error,
605 })
606 }
607 }
608 }
609 PendingSessionEvent::OutgoingConnectionError {
610 remote_addr,
611 session_id,
612 peer_id,
613 error,
614 } => {
615 trace!(
616 target: "net::session",
617 %error,
618 ?session_id,
619 ?remote_addr,
620 ?peer_id,
621 "connection refused"
622 );
623 self.remove_pending_session(&session_id);
624 Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
625 }
626 PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
627 trace!(
628 target: "net::session",
629 %error,
630 ?session_id,
631 ?remote_addr,
632 "ecies auth failed"
633 );
634 self.remove_pending_session(&session_id);
635 match direction {
636 Direction::Incoming => {
637 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
638 remote_addr,
639 error: Some(PendingSessionHandshakeError::Ecies(error)),
640 })
641 }
642 Direction::Outgoing(peer_id) => {
643 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
644 remote_addr,
645 peer_id,
646 error: Some(PendingSessionHandshakeError::Ecies(error)),
647 })
648 }
649 }
650 }
651 }
652 }
653
654 pub(crate) const fn update_advertised_block_range(
655 &mut self,
656 block_range_update: BlockRangeUpdate,
657 ) {
658 self.status.earliest_block = Some(block_range_update.earliest);
659 self.status.latest_block = Some(block_range_update.latest);
660 self.status.blockhash = block_range_update.latest_hash;
661 }
662}
663
664#[derive(Default, Debug, Clone)]
666struct DisconnectionsCounter(Arc<()>);
667
668impl DisconnectionsCounter {
669 const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
670
671 fn has_capacity(&self) -> bool {
674 Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
675 }
676}
677
678#[derive(Debug)]
680pub enum SessionEvent<N: NetworkPrimitives> {
681 SessionEstablished {
685 peer_id: PeerId,
687 remote_addr: SocketAddr,
689 client_version: Arc<str>,
691 capabilities: Arc<Capabilities>,
693 version: EthVersion,
695 status: Arc<UnifiedStatus>,
697 messages: PeerRequestSender<PeerRequest<N>>,
699 direction: Direction,
701 timeout: Arc<AtomicU64>,
704 },
705 AlreadyConnected {
707 peer_id: PeerId,
709 remote_addr: SocketAddr,
711 direction: Direction,
713 },
714 ValidMessage {
716 peer_id: PeerId,
718 message: PeerMessage<N>,
720 },
721 BadMessage {
723 peer_id: PeerId,
725 },
726 ProtocolBreach {
728 peer_id: PeerId,
730 },
731 IncomingPendingSessionClosed {
733 remote_addr: SocketAddr,
735 error: Option<PendingSessionHandshakeError>,
737 },
738 OutgoingPendingSessionClosed {
740 remote_addr: SocketAddr,
742 peer_id: PeerId,
744 error: Option<PendingSessionHandshakeError>,
746 },
747 OutgoingConnectionError {
749 remote_addr: SocketAddr,
751 peer_id: PeerId,
753 error: io::Error,
755 },
756 SessionClosedOnConnectionError {
758 peer_id: PeerId,
760 remote_addr: SocketAddr,
762 error: EthStreamError,
764 },
765 Disconnected {
767 peer_id: PeerId,
769 remote_addr: SocketAddr,
771 },
772}
773
774#[derive(Debug, thiserror::Error)]
776pub enum PendingSessionHandshakeError {
777 #[error(transparent)]
779 Eth(EthStreamError),
780 #[error(transparent)]
782 Ecies(ECIESError),
783 #[error("authentication timed out")]
785 Timeout,
786 #[error("Mandatory extra capability unsupported")]
788 UnsupportedExtraCapability,
789}
790
791impl PendingSessionHandshakeError {
792 pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
794 match self {
795 Self::Eth(eth_err) => eth_err.as_disconnected(),
796 _ => None,
797 }
798 }
799}
800
801#[derive(Debug, Clone, thiserror::Error)]
804#[error("session limit reached {0}")]
805pub struct ExceedsSessionLimit(pub(crate) u32);
806
807pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
809 timeout: Duration,
810 session_id: SessionId,
811 remote_addr: SocketAddr,
812 direction: Direction,
813 events: mpsc::Sender<PendingSessionEvent<N>>,
814 f: F,
815) where
816 F: Future<Output = ()>,
817{
818 if tokio::time::timeout(timeout, f).await.is_err() {
819 trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
820 let event = PendingSessionEvent::Disconnected {
821 remote_addr,
822 session_id,
823 direction,
824 error: Some(PendingSessionHandshakeError::Timeout),
825 };
826 let _ = events.send(event).await;
827 }
828}
829
830#[expect(clippy::too_many_arguments)]
834pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
835 handshake: Arc<dyn EthRlpxHandshake>,
836 disconnect_rx: oneshot::Receiver<()>,
837 session_id: SessionId,
838 stream: TcpStream,
839 events: mpsc::Sender<PendingSessionEvent<N>>,
840 remote_addr: SocketAddr,
841 secret_key: SecretKey,
842 hello: HelloMessageWithProtocols,
843 status: UnifiedStatus,
844 fork_filter: ForkFilter,
845 extra_handlers: RlpxSubProtocolHandlers,
846) {
847 authenticate(
848 handshake,
849 disconnect_rx,
850 events,
851 stream,
852 session_id,
853 remote_addr,
854 secret_key,
855 Direction::Incoming,
856 hello,
857 status,
858 fork_filter,
859 extra_handlers,
860 )
861 .await
862}
863
864#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
866#[expect(clippy::too_many_arguments)]
867async fn start_pending_outbound_session<N: NetworkPrimitives>(
868 handshake: Arc<dyn EthRlpxHandshake>,
869 disconnect_rx: oneshot::Receiver<()>,
870 events: mpsc::Sender<PendingSessionEvent<N>>,
871 session_id: SessionId,
872 remote_addr: SocketAddr,
873 remote_peer_id: PeerId,
874 secret_key: SecretKey,
875 hello: HelloMessageWithProtocols,
876 status: UnifiedStatus,
877 fork_filter: ForkFilter,
878 extra_handlers: RlpxSubProtocolHandlers,
879) {
880 let stream = match TcpStream::connect(remote_addr).await {
881 Ok(stream) => {
882 if let Err(err) = stream.set_nodelay(true) {
883 tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
884 }
885 stream
886 }
887 Err(error) => {
888 let _ = events
889 .send(PendingSessionEvent::OutgoingConnectionError {
890 remote_addr,
891 session_id,
892 peer_id: remote_peer_id,
893 error,
894 })
895 .await;
896 return
897 }
898 };
899 authenticate(
900 handshake,
901 disconnect_rx,
902 events,
903 stream,
904 session_id,
905 remote_addr,
906 secret_key,
907 Direction::Outgoing(remote_peer_id),
908 hello,
909 status,
910 fork_filter,
911 extra_handlers,
912 )
913 .await
914}
915
916#[expect(clippy::too_many_arguments)]
918async fn authenticate<N: NetworkPrimitives>(
919 handshake: Arc<dyn EthRlpxHandshake>,
920 disconnect_rx: oneshot::Receiver<()>,
921 events: mpsc::Sender<PendingSessionEvent<N>>,
922 stream: TcpStream,
923 session_id: SessionId,
924 remote_addr: SocketAddr,
925 secret_key: SecretKey,
926 direction: Direction,
927 hello: HelloMessageWithProtocols,
928 status: UnifiedStatus,
929 fork_filter: ForkFilter,
930 extra_handlers: RlpxSubProtocolHandlers,
931) {
932 let local_addr = stream.local_addr().ok();
933 let stream = match get_ecies_stream(stream, secret_key, direction).await {
934 Ok(stream) => stream,
935 Err(error) => {
936 let _ = events
937 .send(PendingSessionEvent::EciesAuthError {
938 remote_addr,
939 session_id,
940 error,
941 direction,
942 })
943 .await;
944 return
945 }
946 };
947
948 let unauthed = UnauthedP2PStream::new(stream);
949
950 let auth = authenticate_stream(
951 handshake,
952 unauthed,
953 session_id,
954 remote_addr,
955 local_addr,
956 direction,
957 hello,
958 status,
959 fork_filter,
960 extra_handlers,
961 )
962 .boxed();
963
964 match futures::future::select(disconnect_rx, auth).await {
965 Either::Left((_, _)) => {
966 let _ = events
967 .send(PendingSessionEvent::Disconnected {
968 remote_addr,
969 session_id,
970 direction,
971 error: None,
972 })
973 .await;
974 }
975 Either::Right((res, _)) => {
976 let _ = events.send(res).await;
977 }
978 }
979}
980
981async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
984 stream: Io,
985 secret_key: SecretKey,
986 direction: Direction,
987) -> Result<ECIESStream<Io>, ECIESError> {
988 match direction {
989 Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
990 Direction::Outgoing(remote_peer_id) => {
991 ECIESStream::connect(stream, secret_key, remote_peer_id).await
992 }
993 }
994}
995
996#[expect(clippy::too_many_arguments)]
1003async fn authenticate_stream<N: NetworkPrimitives>(
1004 handshake: Arc<dyn EthRlpxHandshake>,
1005 stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1006 session_id: SessionId,
1007 remote_addr: SocketAddr,
1008 local_addr: Option<SocketAddr>,
1009 direction: Direction,
1010 mut hello: HelloMessageWithProtocols,
1011 mut status: UnifiedStatus,
1012 fork_filter: ForkFilter,
1013 mut extra_handlers: RlpxSubProtocolHandlers,
1014) -> PendingSessionEvent<N> {
1015 extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1017
1018 let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1020 Ok(stream_res) => stream_res,
1021 Err(err) => {
1022 return PendingSessionEvent::Disconnected {
1023 remote_addr,
1024 session_id,
1025 direction,
1026 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1027 }
1028 }
1029 };
1030
1031 if !extra_handlers.is_empty() {
1033 while let Some(pos) = extra_handlers.iter().position(|handler| {
1035 p2p_stream
1036 .shared_capabilities()
1037 .ensure_matching_capability(&handler.protocol().cap)
1038 .is_err()
1039 }) {
1040 let handler = extra_handlers.remove(pos);
1041 if handler.on_unsupported_by_peer(
1042 p2p_stream.shared_capabilities(),
1043 direction,
1044 their_hello.id,
1045 ) == OnNotSupported::Disconnect
1046 {
1047 return PendingSessionEvent::Disconnected {
1048 remote_addr,
1049 session_id,
1050 direction,
1051 error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1052 };
1053 }
1054 }
1055 }
1056
1057 let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1059 Ok(version) => version,
1060 Err(err) => {
1061 return PendingSessionEvent::Disconnected {
1062 remote_addr,
1063 session_id,
1064 direction,
1065 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1066 }
1067 }
1068 };
1069
1070 let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1071 status.set_eth_version(eth_version);
1076
1077 match handshake
1079 .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1080 .await
1081 {
1082 Ok(their_status) => {
1083 let eth_stream = EthStream::new(eth_version, p2p_stream);
1084 (eth_stream.into(), their_status)
1085 }
1086 Err(err) => {
1087 return PendingSessionEvent::Disconnected {
1088 remote_addr,
1089 session_id,
1090 direction,
1091 error: Some(PendingSessionHandshakeError::Eth(err)),
1092 }
1093 }
1094 }
1095 } else {
1096 let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1098
1099 for handler in extra_handlers.into_iter() {
1101 let cap = handler.protocol().cap;
1102 let remote_peer_id = their_hello.id;
1103
1104 multiplex_stream
1105 .install_protocol(&cap, move |conn| {
1106 handler.into_connection(direction, remote_peer_id, conn)
1107 })
1108 .ok();
1109 }
1110
1111 let (multiplex_stream, their_status) =
1112 match multiplex_stream.into_eth_satellite_stream(status, fork_filter).await {
1113 Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1114 Err(err) => {
1115 return PendingSessionEvent::Disconnected {
1116 remote_addr,
1117 session_id,
1118 direction,
1119 error: Some(PendingSessionHandshakeError::Eth(err)),
1120 }
1121 }
1122 };
1123
1124 (multiplex_stream.into(), their_status)
1125 };
1126
1127 PendingSessionEvent::Established {
1128 session_id,
1129 remote_addr,
1130 local_addr,
1131 peer_id: their_hello.id,
1132 capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1133 status: Arc::new(their_status),
1134 conn,
1135 direction,
1136 client_id: their_hello.client_version,
1137 }
1138}