reth_network/session/
mod.rs

1//! Support for handling peer sessions.
2
3mod active;
4mod conn;
5mod counter;
6mod handle;
7
8use active::QueuedOutgoingMessages;
9pub use conn::EthRlpxConnection;
10pub use handle::{
11    ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
12    SessionCommand,
13};
14
15pub use reth_network_api::{Direction, PeerInfo};
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    net::SocketAddr,
21    sync::{atomic::AtomicU64, Arc},
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use crate::{
27    message::PeerMessage,
28    metrics::SessionManagerMetrics,
29    protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
30    session::active::ActiveSession,
31};
32use counter::SessionCounter;
33use futures::{future::Either, io, FutureExt, StreamExt};
34use reth_ecies::{stream::ECIESStream, ECIESError};
35use reth_eth_wire::{
36    errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
37    BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
38    HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
39    HANDSHAKE_TIMEOUT,
40};
41use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
42use reth_metrics::common::mpsc::MeteredPollSender;
43use reth_network_api::{PeerRequest, PeerRequestSender};
44use reth_network_peers::PeerId;
45use reth_network_types::SessionsConfig;
46use reth_tasks::TaskSpawner;
47use rustc_hash::FxHashMap;
48use secp256k1::SecretKey;
49use tokio::{
50    io::{AsyncRead, AsyncWrite},
51    net::TcpStream,
52    sync::{mpsc, mpsc::error::TrySendError, oneshot},
53};
54use tokio_stream::wrappers::ReceiverStream;
55use tokio_util::sync::PollSender;
56use tracing::{debug, instrument, trace};
57
58/// Internal identifier for active sessions.
59#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
60pub struct SessionId(usize);
61
62/// Manages a set of sessions.
63#[must_use = "Session Manager must be polled to process session events."]
64#[derive(Debug)]
65pub struct SessionManager<N: NetworkPrimitives> {
66    /// Tracks the identifier for the next session.
67    next_id: usize,
68    /// Keeps track of all sessions
69    counter: SessionCounter,
70    ///  The maximum initial time an [`ActiveSession`] waits for a response from the peer before it
71    /// responds to an _internal_ request with a `TimeoutError`
72    initial_internal_request_timeout: Duration,
73    /// If an [`ActiveSession`] does not receive a response at all within this duration then it is
74    /// considered a protocol violation and the session will initiate a drop.
75    protocol_breach_request_timeout: Duration,
76    /// The timeout after which a pending session attempt is considered failed.
77    pending_session_timeout: Duration,
78    /// The secret key used for authenticating sessions.
79    secret_key: SecretKey,
80    /// The `Status` message to send to peers.
81    status: UnifiedStatus,
82    /// The `HelloMessage` message to send to peers.
83    hello_message: HelloMessageWithProtocols,
84    /// The [`ForkFilter`] used to validate the peer's `Status` message.
85    fork_filter: ForkFilter,
86    /// Size of the command buffer per session.
87    session_command_buffer: usize,
88    /// The executor for spawned tasks.
89    executor: Box<dyn TaskSpawner>,
90    /// All pending session that are currently handshaking, exchanging `Hello`s.
91    ///
92    /// Events produced during the authentication phase are reported to this manager. Once the
93    /// session is authenticated, it can be moved to the `active_session` set.
94    pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
95    /// All active sessions that are ready to exchange messages.
96    active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
97    /// The original Sender half of the [`PendingSessionEvent`] channel.
98    ///
99    /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
100    /// get a clone of this sender half.
101    pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
102    /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
103    pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
104    /// The original Sender half of the [`ActiveSessionMessage`] channel.
105    ///
106    /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
107    /// clone of this sender half.
108    active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
109    /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
110    active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
111    /// Additional `RLPx` sub-protocols to be used by the session manager.
112    extra_protocols: RlpxSubProtocols,
113    /// Tracks the ongoing graceful disconnections attempts for incoming connections.
114    disconnections_counter: DisconnectionsCounter,
115    /// Metrics for the session manager.
116    metrics: SessionManagerMetrics,
117    /// The [`EthRlpxHandshake`] is used to perform the initial handshake with the peer.
118    handshake: Arc<dyn EthRlpxHandshake>,
119}
120
121// === impl SessionManager ===
122
123impl<N: NetworkPrimitives> SessionManager<N> {
124    /// Creates a new empty [`SessionManager`].
125    #[expect(clippy::too_many_arguments)]
126    pub fn new(
127        secret_key: SecretKey,
128        config: SessionsConfig,
129        executor: Box<dyn TaskSpawner>,
130        status: UnifiedStatus,
131        hello_message: HelloMessageWithProtocols,
132        fork_filter: ForkFilter,
133        extra_protocols: RlpxSubProtocols,
134        handshake: Arc<dyn EthRlpxHandshake>,
135    ) -> Self {
136        let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
137        let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
138        let active_session_tx = PollSender::new(active_session_tx);
139
140        Self {
141            next_id: 0,
142            counter: SessionCounter::new(config.limits),
143            initial_internal_request_timeout: config.initial_internal_request_timeout,
144            protocol_breach_request_timeout: config.protocol_breach_request_timeout,
145            pending_session_timeout: config.pending_session_timeout,
146            secret_key,
147            status,
148            hello_message,
149            fork_filter,
150            session_command_buffer: config.session_command_buffer,
151            executor,
152            pending_sessions: Default::default(),
153            active_sessions: Default::default(),
154            pending_sessions_tx,
155            pending_session_rx: ReceiverStream::new(pending_sessions_rx),
156            active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
157            active_session_rx: ReceiverStream::new(active_session_rx),
158            extra_protocols,
159            disconnections_counter: Default::default(),
160            metrics: Default::default(),
161            handshake,
162        }
163    }
164
165    /// Check whether the provided [`ForkId`] is compatible based on the validation rules in
166    /// `EIP-2124`.
167    pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
168        self.fork_filter.validate(fork_id).is_ok()
169    }
170
171    /// Returns the next unique [`SessionId`].
172    const fn next_id(&mut self) -> SessionId {
173        let id = self.next_id;
174        self.next_id += 1;
175        SessionId(id)
176    }
177
178    /// Returns the current status of the session.
179    pub const fn status(&self) -> UnifiedStatus {
180        self.status
181    }
182
183    /// Returns the secret key used for authenticating sessions.
184    pub const fn secret_key(&self) -> SecretKey {
185        self.secret_key
186    }
187
188    /// Returns a borrowed reference to the active sessions.
189    pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
190        &self.active_sessions
191    }
192
193    /// Returns the session hello message.
194    pub fn hello_message(&self) -> HelloMessageWithProtocols {
195        self.hello_message.clone()
196    }
197
198    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
199    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
200        self.extra_protocols.push(protocol)
201    }
202
203    /// Returns the number of currently pending connections.
204    #[inline]
205    pub(crate) fn num_pending_connections(&self) -> usize {
206        self.pending_sessions.len()
207    }
208
209    /// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
210    /// [`JoinSet`](tokio::task::JoinSet).
211    fn spawn<F>(&self, f: F)
212    where
213        F: Future<Output = ()> + Send + 'static,
214    {
215        self.executor.spawn(f.boxed());
216    }
217
218    /// Invoked on a received status update.
219    ///
220    /// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
221    /// active [`ForkId`]. See also [`ForkFilter::set_head`].
222    pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
223        self.status.blockhash = head.hash;
224        self.status.total_difficulty = Some(head.total_difficulty);
225        let transition = self.fork_filter.set_head(head);
226        self.status.forkid = self.fork_filter.current();
227        self.status.latest_block = Some(head.number);
228
229        transition
230    }
231
232    /// An incoming TCP connection was received. This starts the authentication process to turn this
233    /// stream into an active peer session.
234    ///
235    /// Returns an error if the configured limit has been reached.
236    pub(crate) fn on_incoming(
237        &mut self,
238        stream: TcpStream,
239        remote_addr: SocketAddr,
240    ) -> Result<SessionId, ExceedsSessionLimit> {
241        self.counter.ensure_pending_inbound()?;
242
243        let session_id = self.next_id();
244
245        trace!(
246            target: "net::session",
247            ?remote_addr,
248            ?session_id,
249            "new pending incoming session"
250        );
251
252        let (disconnect_tx, disconnect_rx) = oneshot::channel();
253        let pending_events = self.pending_sessions_tx.clone();
254        let secret_key = self.secret_key;
255        let hello_message = self.hello_message.clone();
256        let status = self.status;
257        let fork_filter = self.fork_filter.clone();
258        let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
259        self.spawn(pending_session_with_timeout(
260            self.pending_session_timeout,
261            session_id,
262            remote_addr,
263            Direction::Incoming,
264            pending_events.clone(),
265            start_pending_incoming_session(
266                self.handshake.clone(),
267                disconnect_rx,
268                session_id,
269                stream,
270                pending_events,
271                remote_addr,
272                secret_key,
273                hello_message,
274                status,
275                fork_filter,
276                extra_handlers,
277            ),
278        ));
279
280        let handle = PendingSessionHandle {
281            disconnect_tx: Some(disconnect_tx),
282            direction: Direction::Incoming,
283        };
284        self.pending_sessions.insert(session_id, handle);
285        self.counter.inc_pending_inbound();
286        Ok(session_id)
287    }
288
289    /// Starts a new pending session from the local node to the given remote node.
290    pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
291        // The error can be dropped because no dial will be made if it would exceed the limit
292        if self.counter.ensure_pending_outbound().is_ok() {
293            let session_id = self.next_id();
294            let (disconnect_tx, disconnect_rx) = oneshot::channel();
295            let pending_events = self.pending_sessions_tx.clone();
296            let secret_key = self.secret_key;
297            let hello_message = self.hello_message.clone();
298            let fork_filter = self.fork_filter.clone();
299            let status = self.status;
300            let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
301            self.spawn(pending_session_with_timeout(
302                self.pending_session_timeout,
303                session_id,
304                remote_addr,
305                Direction::Outgoing(remote_peer_id),
306                pending_events.clone(),
307                start_pending_outbound_session(
308                    self.handshake.clone(),
309                    disconnect_rx,
310                    pending_events,
311                    session_id,
312                    remote_addr,
313                    remote_peer_id,
314                    secret_key,
315                    hello_message,
316                    status,
317                    fork_filter,
318                    extra_handlers,
319                ),
320            ));
321
322            let handle = PendingSessionHandle {
323                disconnect_tx: Some(disconnect_tx),
324                direction: Direction::Outgoing(remote_peer_id),
325            };
326            self.pending_sessions.insert(session_id, handle);
327            self.counter.inc_pending_outbound();
328        }
329    }
330
331    /// Initiates a shutdown of the channel.
332    ///
333    /// This will trigger the disconnect on the session task to gracefully terminate. The result
334    /// will be picked up by the receiver.
335    pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
336        if let Some(session) = self.active_sessions.get(&node) {
337            session.disconnect(reason);
338        }
339    }
340
341    /// Initiates a shutdown of all sessions.
342    ///
343    /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
344    /// will be picked by the receiver.
345    pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
346        for session in self.active_sessions.values() {
347            session.disconnect(reason);
348        }
349    }
350
351    /// Disconnects all pending sessions.
352    pub fn disconnect_all_pending(&mut self) {
353        for session in self.pending_sessions.values_mut() {
354            session.disconnect();
355        }
356    }
357
358    /// Sends a message to the peer's session
359    pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
360        if let Some(session) = self.active_sessions.get(peer_id) {
361            let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
362                |e| {
363                    if let TrySendError::Full(_) = e {
364                        debug!(
365                            target: "net::session",
366                            ?peer_id,
367                            "session command buffer full, dropping message"
368                        );
369                        self.metrics.total_outgoing_peer_messages_dropped.increment(1);
370                    }
371                },
372            );
373        }
374    }
375
376    /// Removes the [`PendingSessionHandle`] if it exists.
377    fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
378        let session = self.pending_sessions.remove(id)?;
379        self.counter.dec_pending(&session.direction);
380        Some(session)
381    }
382
383    /// Removes the [`PendingSessionHandle`] if it exists.
384    fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
385        let session = self.active_sessions.remove(id)?;
386        self.counter.dec_active(&session.direction);
387        Some(session)
388    }
389
390    /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
391    /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
392    /// simply drop the incoming connection.
393    pub(crate) fn try_disconnect_incoming_connection(
394        &self,
395        stream: TcpStream,
396        reason: DisconnectReason,
397    ) {
398        if !self.disconnections_counter.has_capacity() {
399            // drop the connection if we don't have capacity for gracefully disconnecting
400            return
401        }
402
403        let guard = self.disconnections_counter.clone();
404        let secret_key = self.secret_key;
405
406        self.spawn(async move {
407            trace!(
408                target: "net::session",
409                "gracefully disconnecting incoming connection"
410            );
411            if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
412                let mut unauth = UnauthedP2PStream::new(stream);
413                let _ = unauth.send_disconnect(reason).await;
414                drop(guard);
415            }
416        });
417    }
418
419    /// This polls all the session handles and returns [`SessionEvent`].
420    ///
421    /// Active sessions are prioritized.
422    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
423        // Poll events from active sessions
424        match self.active_session_rx.poll_next_unpin(cx) {
425            Poll::Pending => {}
426            Poll::Ready(None) => {
427                unreachable!("Manager holds both channel halves.")
428            }
429            Poll::Ready(Some(event)) => {
430                return match event {
431                    ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
432                        trace!(
433                            target: "net::session",
434                            ?peer_id,
435                            "gracefully disconnected active session."
436                        );
437                        self.remove_active_session(&peer_id);
438                        Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
439                    }
440                    ActiveSessionMessage::ClosedOnConnectionError {
441                        peer_id,
442                        remote_addr,
443                        error,
444                    } => {
445                        trace!(target: "net::session", ?peer_id, %error,"closed session.");
446                        self.remove_active_session(&peer_id);
447                        Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
448                            remote_addr,
449                            peer_id,
450                            error,
451                        })
452                    }
453                    ActiveSessionMessage::ValidMessage { peer_id, message } => {
454                        Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
455                    }
456                    ActiveSessionMessage::BadMessage { peer_id } => {
457                        Poll::Ready(SessionEvent::BadMessage { peer_id })
458                    }
459                    ActiveSessionMessage::ProtocolBreach { peer_id } => {
460                        Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
461                    }
462                }
463            }
464        }
465
466        // Poll the pending session event stream
467        let event = match self.pending_session_rx.poll_next_unpin(cx) {
468            Poll::Pending => return Poll::Pending,
469            Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
470            Poll::Ready(Some(event)) => event,
471        };
472        match event {
473            PendingSessionEvent::Established {
474                session_id,
475                remote_addr,
476                local_addr,
477                peer_id,
478                capabilities,
479                conn,
480                status,
481                direction,
482                client_id,
483            } => {
484                // move from pending to established.
485                self.remove_pending_session(&session_id);
486
487                // If there's already a session to the peer then we disconnect right away
488                if self.active_sessions.contains_key(&peer_id) {
489                    trace!(
490                        target: "net::session",
491                        ?session_id,
492                        ?remote_addr,
493                        ?peer_id,
494                        ?direction,
495                        "already connected"
496                    );
497
498                    self.spawn(async move {
499                        // send a disconnect message
500                        let _ =
501                            conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
502                    });
503
504                    return Poll::Ready(SessionEvent::AlreadyConnected {
505                        peer_id,
506                        remote_addr,
507                        direction,
508                    })
509                }
510
511                let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
512
513                let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
514
515                let messages = PeerRequestSender::new(peer_id, to_session_tx);
516
517                let timeout = Arc::new(AtomicU64::new(
518                    self.initial_internal_request_timeout.as_millis() as u64,
519                ));
520
521                // negotiated version
522                let version = conn.version();
523
524                let session = ActiveSession {
525                    next_id: 0,
526                    remote_peer_id: peer_id,
527                    remote_addr,
528                    remote_capabilities: Arc::clone(&capabilities),
529                    session_id,
530                    commands_rx: ReceiverStream::new(commands_rx),
531                    to_session_manager: self.active_session_tx.clone(),
532                    pending_message_to_session: None,
533                    internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
534                    inflight_requests: Default::default(),
535                    conn,
536                    queued_outgoing: QueuedOutgoingMessages::new(
537                        self.metrics.queued_outgoing_messages.clone(),
538                    ),
539                    received_requests_from_remote: Default::default(),
540                    internal_request_timeout_interval: tokio::time::interval(
541                        self.initial_internal_request_timeout,
542                    ),
543                    internal_request_timeout: Arc::clone(&timeout),
544                    protocol_breach_request_timeout: self.protocol_breach_request_timeout,
545                    terminate_message: None,
546                };
547
548                self.spawn(session);
549
550                let client_version = client_id.into();
551                let handle = ActiveSessionHandle {
552                    status: status.clone(),
553                    direction,
554                    session_id,
555                    remote_id: peer_id,
556                    version,
557                    established: Instant::now(),
558                    capabilities: Arc::clone(&capabilities),
559                    commands_to_session,
560                    client_version: Arc::clone(&client_version),
561                    remote_addr,
562                    local_addr,
563                };
564
565                self.active_sessions.insert(peer_id, handle);
566                self.counter.inc_active(&direction);
567
568                if direction.is_outgoing() {
569                    self.metrics.total_dial_successes.increment(1);
570                }
571
572                Poll::Ready(SessionEvent::SessionEstablished {
573                    peer_id,
574                    remote_addr,
575                    client_version,
576                    version,
577                    capabilities,
578                    status,
579                    messages,
580                    direction,
581                    timeout,
582                })
583            }
584            PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
585                trace!(
586                    target: "net::session",
587                    ?session_id,
588                    ?remote_addr,
589                    ?error,
590                    "disconnected pending session"
591                );
592                self.remove_pending_session(&session_id);
593                match direction {
594                    Direction::Incoming => {
595                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
596                            remote_addr,
597                            error,
598                        })
599                    }
600                    Direction::Outgoing(peer_id) => {
601                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
602                            remote_addr,
603                            peer_id,
604                            error,
605                        })
606                    }
607                }
608            }
609            PendingSessionEvent::OutgoingConnectionError {
610                remote_addr,
611                session_id,
612                peer_id,
613                error,
614            } => {
615                trace!(
616                    target: "net::session",
617                    %error,
618                    ?session_id,
619                    ?remote_addr,
620                    ?peer_id,
621                    "connection refused"
622                );
623                self.remove_pending_session(&session_id);
624                Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
625            }
626            PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
627                trace!(
628                    target: "net::session",
629                    %error,
630                    ?session_id,
631                    ?remote_addr,
632                    "ecies auth failed"
633                );
634                self.remove_pending_session(&session_id);
635                match direction {
636                    Direction::Incoming => {
637                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
638                            remote_addr,
639                            error: Some(PendingSessionHandshakeError::Ecies(error)),
640                        })
641                    }
642                    Direction::Outgoing(peer_id) => {
643                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
644                            remote_addr,
645                            peer_id,
646                            error: Some(PendingSessionHandshakeError::Ecies(error)),
647                        })
648                    }
649                }
650            }
651        }
652    }
653
654    pub(crate) const fn update_advertised_block_range(
655        &mut self,
656        block_range_update: BlockRangeUpdate,
657    ) {
658        self.status.earliest_block = Some(block_range_update.earliest);
659        self.status.latest_block = Some(block_range_update.latest);
660        self.status.blockhash = block_range_update.latest_hash;
661    }
662}
663
664/// A counter for ongoing graceful disconnections attempts.
665#[derive(Default, Debug, Clone)]
666struct DisconnectionsCounter(Arc<()>);
667
668impl DisconnectionsCounter {
669    const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
670
671    /// Returns true if the [`DisconnectionsCounter`] still has capacity
672    /// for an additional graceful disconnection.
673    fn has_capacity(&self) -> bool {
674        Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
675    }
676}
677
678/// Events produced by the [`SessionManager`]
679#[derive(Debug)]
680pub enum SessionEvent<N: NetworkPrimitives> {
681    /// A new session was successfully authenticated.
682    ///
683    /// This session is now able to exchange data.
684    SessionEstablished {
685        /// The remote node's public key
686        peer_id: PeerId,
687        /// The remote node's socket address
688        remote_addr: SocketAddr,
689        /// The user agent of the remote node, usually containing the client name and version
690        client_version: Arc<str>,
691        /// The capabilities the remote node has announced
692        capabilities: Arc<Capabilities>,
693        /// negotiated eth version
694        version: EthVersion,
695        /// The Status message the peer sent during the `eth` handshake
696        status: Arc<UnifiedStatus>,
697        /// The channel for sending messages to the peer with the session
698        messages: PeerRequestSender<PeerRequest<N>>,
699        /// The direction of the session, either `Inbound` or `Outgoing`
700        direction: Direction,
701        /// The maximum time that the session waits for a response from the peer before timing out
702        /// the connection
703        timeout: Arc<AtomicU64>,
704    },
705    /// The peer was already connected with another session.
706    AlreadyConnected {
707        /// The remote node's public key
708        peer_id: PeerId,
709        /// The remote node's socket address
710        remote_addr: SocketAddr,
711        /// The direction of the session, either `Inbound` or `Outgoing`
712        direction: Direction,
713    },
714    /// A session received a valid message via `RLPx`.
715    ValidMessage {
716        /// The remote node's public key
717        peer_id: PeerId,
718        /// Message received from the peer.
719        message: PeerMessage<N>,
720    },
721    /// Received a bad message from the peer.
722    BadMessage {
723        /// Identifier of the remote peer.
724        peer_id: PeerId,
725    },
726    /// Remote peer is considered in protocol violation
727    ProtocolBreach {
728        /// Identifier of the remote peer.
729        peer_id: PeerId,
730    },
731    /// Closed an incoming pending session during handshaking.
732    IncomingPendingSessionClosed {
733        /// The remote node's socket address
734        remote_addr: SocketAddr,
735        /// The pending handshake session error that caused the session to close
736        error: Option<PendingSessionHandshakeError>,
737    },
738    /// Closed an outgoing pending session during handshaking.
739    OutgoingPendingSessionClosed {
740        /// The remote node's socket address
741        remote_addr: SocketAddr,
742        /// The remote node's public key
743        peer_id: PeerId,
744        /// The pending handshake session error that caused the session to close
745        error: Option<PendingSessionHandshakeError>,
746    },
747    /// Failed to establish a tcp stream
748    OutgoingConnectionError {
749        /// The remote node's socket address
750        remote_addr: SocketAddr,
751        /// The remote node's public key
752        peer_id: PeerId,
753        /// The error that caused the outgoing connection to fail
754        error: io::Error,
755    },
756    /// Session was closed due to an error
757    SessionClosedOnConnectionError {
758        /// The id of the remote peer.
759        peer_id: PeerId,
760        /// The socket we were connected to.
761        remote_addr: SocketAddr,
762        /// The error that caused the session to close
763        error: EthStreamError,
764    },
765    /// Active session was gracefully disconnected.
766    Disconnected {
767        /// The remote node's public key
768        peer_id: PeerId,
769        /// The remote node's socket address that we were connected to
770        remote_addr: SocketAddr,
771    },
772}
773
774/// Errors that can occur during handshaking/authenticating the underlying streams.
775#[derive(Debug, thiserror::Error)]
776pub enum PendingSessionHandshakeError {
777    /// The pending session failed due to an error while establishing the `eth` stream
778    #[error(transparent)]
779    Eth(EthStreamError),
780    /// The pending session failed due to an error while establishing the ECIES stream
781    #[error(transparent)]
782    Ecies(ECIESError),
783    /// Thrown when the authentication timed out
784    #[error("authentication timed out")]
785    Timeout,
786    /// Thrown when the remote lacks the required capability
787    #[error("Mandatory extra capability unsupported")]
788    UnsupportedExtraCapability,
789}
790
791impl PendingSessionHandshakeError {
792    /// Returns the [`DisconnectReason`] if the error is a disconnect message
793    pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
794        match self {
795            Self::Eth(eth_err) => eth_err.as_disconnected(),
796            _ => None,
797        }
798    }
799}
800
801/// The error thrown when the max configured limit has been reached and no more connections are
802/// accepted.
803#[derive(Debug, Clone, thiserror::Error)]
804#[error("session limit reached {0}")]
805pub struct ExceedsSessionLimit(pub(crate) u32);
806
807/// Starts a pending session authentication with a timeout.
808pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
809    timeout: Duration,
810    session_id: SessionId,
811    remote_addr: SocketAddr,
812    direction: Direction,
813    events: mpsc::Sender<PendingSessionEvent<N>>,
814    f: F,
815) where
816    F: Future<Output = ()>,
817{
818    if tokio::time::timeout(timeout, f).await.is_err() {
819        trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
820        let event = PendingSessionEvent::Disconnected {
821            remote_addr,
822            session_id,
823            direction,
824            error: Some(PendingSessionHandshakeError::Timeout),
825        };
826        let _ = events.send(event).await;
827    }
828}
829
830/// Starts the authentication process for a connection initiated by a remote peer.
831///
832/// This will wait for the _incoming_ handshake request and answer it.
833#[expect(clippy::too_many_arguments)]
834pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
835    handshake: Arc<dyn EthRlpxHandshake>,
836    disconnect_rx: oneshot::Receiver<()>,
837    session_id: SessionId,
838    stream: TcpStream,
839    events: mpsc::Sender<PendingSessionEvent<N>>,
840    remote_addr: SocketAddr,
841    secret_key: SecretKey,
842    hello: HelloMessageWithProtocols,
843    status: UnifiedStatus,
844    fork_filter: ForkFilter,
845    extra_handlers: RlpxSubProtocolHandlers,
846) {
847    authenticate(
848        handshake,
849        disconnect_rx,
850        events,
851        stream,
852        session_id,
853        remote_addr,
854        secret_key,
855        Direction::Incoming,
856        hello,
857        status,
858        fork_filter,
859        extra_handlers,
860    )
861    .await
862}
863
864/// Starts the authentication process for a connection initiated by a remote peer.
865#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
866#[expect(clippy::too_many_arguments)]
867async fn start_pending_outbound_session<N: NetworkPrimitives>(
868    handshake: Arc<dyn EthRlpxHandshake>,
869    disconnect_rx: oneshot::Receiver<()>,
870    events: mpsc::Sender<PendingSessionEvent<N>>,
871    session_id: SessionId,
872    remote_addr: SocketAddr,
873    remote_peer_id: PeerId,
874    secret_key: SecretKey,
875    hello: HelloMessageWithProtocols,
876    status: UnifiedStatus,
877    fork_filter: ForkFilter,
878    extra_handlers: RlpxSubProtocolHandlers,
879) {
880    let stream = match TcpStream::connect(remote_addr).await {
881        Ok(stream) => {
882            if let Err(err) = stream.set_nodelay(true) {
883                tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
884            }
885            stream
886        }
887        Err(error) => {
888            let _ = events
889                .send(PendingSessionEvent::OutgoingConnectionError {
890                    remote_addr,
891                    session_id,
892                    peer_id: remote_peer_id,
893                    error,
894                })
895                .await;
896            return
897        }
898    };
899    authenticate(
900        handshake,
901        disconnect_rx,
902        events,
903        stream,
904        session_id,
905        remote_addr,
906        secret_key,
907        Direction::Outgoing(remote_peer_id),
908        hello,
909        status,
910        fork_filter,
911        extra_handlers,
912    )
913    .await
914}
915
916/// Authenticates a session
917#[expect(clippy::too_many_arguments)]
918async fn authenticate<N: NetworkPrimitives>(
919    handshake: Arc<dyn EthRlpxHandshake>,
920    disconnect_rx: oneshot::Receiver<()>,
921    events: mpsc::Sender<PendingSessionEvent<N>>,
922    stream: TcpStream,
923    session_id: SessionId,
924    remote_addr: SocketAddr,
925    secret_key: SecretKey,
926    direction: Direction,
927    hello: HelloMessageWithProtocols,
928    status: UnifiedStatus,
929    fork_filter: ForkFilter,
930    extra_handlers: RlpxSubProtocolHandlers,
931) {
932    let local_addr = stream.local_addr().ok();
933    let stream = match get_ecies_stream(stream, secret_key, direction).await {
934        Ok(stream) => stream,
935        Err(error) => {
936            let _ = events
937                .send(PendingSessionEvent::EciesAuthError {
938                    remote_addr,
939                    session_id,
940                    error,
941                    direction,
942                })
943                .await;
944            return
945        }
946    };
947
948    let unauthed = UnauthedP2PStream::new(stream);
949
950    let auth = authenticate_stream(
951        handshake,
952        unauthed,
953        session_id,
954        remote_addr,
955        local_addr,
956        direction,
957        hello,
958        status,
959        fork_filter,
960        extra_handlers,
961    )
962    .boxed();
963
964    match futures::future::select(disconnect_rx, auth).await {
965        Either::Left((_, _)) => {
966            let _ = events
967                .send(PendingSessionEvent::Disconnected {
968                    remote_addr,
969                    session_id,
970                    direction,
971                    error: None,
972                })
973                .await;
974        }
975        Either::Right((res, _)) => {
976            let _ = events.send(res).await;
977        }
978    }
979}
980
981/// Returns an [`ECIESStream`] if it can be built. If not, send a
982/// [`PendingSessionEvent::EciesAuthError`] and returns `None`
983async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
984    stream: Io,
985    secret_key: SecretKey,
986    direction: Direction,
987) -> Result<ECIESStream<Io>, ECIESError> {
988    match direction {
989        Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
990        Direction::Outgoing(remote_peer_id) => {
991            ECIESStream::connect(stream, secret_key, remote_peer_id).await
992        }
993    }
994}
995
996/// Authenticate the stream via handshake
997///
998/// On Success return the authenticated stream as [`PendingSessionEvent`].
999///
1000/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
1001/// also negotiate the additional protocols.
1002#[expect(clippy::too_many_arguments)]
1003async fn authenticate_stream<N: NetworkPrimitives>(
1004    handshake: Arc<dyn EthRlpxHandshake>,
1005    stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1006    session_id: SessionId,
1007    remote_addr: SocketAddr,
1008    local_addr: Option<SocketAddr>,
1009    direction: Direction,
1010    mut hello: HelloMessageWithProtocols,
1011    mut status: UnifiedStatus,
1012    fork_filter: ForkFilter,
1013    mut extra_handlers: RlpxSubProtocolHandlers,
1014) -> PendingSessionEvent<N> {
1015    // Add extra protocols to the hello message
1016    extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1017
1018    // conduct the p2p rlpx handshake and return the rlpx authenticated stream
1019    let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1020        Ok(stream_res) => stream_res,
1021        Err(err) => {
1022            return PendingSessionEvent::Disconnected {
1023                remote_addr,
1024                session_id,
1025                direction,
1026                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1027            }
1028        }
1029    };
1030
1031    // if we have extra handlers, check if it must be supported by the remote
1032    if !extra_handlers.is_empty() {
1033        // ensure that no extra handlers that aren't supported are not mandatory
1034        while let Some(pos) = extra_handlers.iter().position(|handler| {
1035            p2p_stream
1036                .shared_capabilities()
1037                .ensure_matching_capability(&handler.protocol().cap)
1038                .is_err()
1039        }) {
1040            let handler = extra_handlers.remove(pos);
1041            if handler.on_unsupported_by_peer(
1042                p2p_stream.shared_capabilities(),
1043                direction,
1044                their_hello.id,
1045            ) == OnNotSupported::Disconnect
1046            {
1047                return PendingSessionEvent::Disconnected {
1048                    remote_addr,
1049                    session_id,
1050                    direction,
1051                    error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1052                };
1053            }
1054        }
1055    }
1056
1057    // Ensure we negotiated mandatory eth protocol
1058    let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1059        Ok(version) => version,
1060        Err(err) => {
1061            return PendingSessionEvent::Disconnected {
1062                remote_addr,
1063                session_id,
1064                direction,
1065                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1066            }
1067        }
1068    };
1069
1070    let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1071        // if the shared caps are 1, we know both support the eth version
1072        // if the hello handshake was successful we can try status handshake
1073        //
1074        // Before trying status handshake, set up the version to negotiated shared version
1075        status.set_eth_version(eth_version);
1076
1077        // perform the eth protocol handshake
1078        match handshake
1079            .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1080            .await
1081        {
1082            Ok(their_status) => {
1083                let eth_stream = EthStream::new(eth_version, p2p_stream);
1084                (eth_stream.into(), their_status)
1085            }
1086            Err(err) => {
1087                return PendingSessionEvent::Disconnected {
1088                    remote_addr,
1089                    session_id,
1090                    direction,
1091                    error: Some(PendingSessionHandshakeError::Eth(err)),
1092                }
1093            }
1094        }
1095    } else {
1096        // Multiplex the stream with the extra protocols
1097        let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1098
1099        // install additional handlers
1100        for handler in extra_handlers.into_iter() {
1101            let cap = handler.protocol().cap;
1102            let remote_peer_id = their_hello.id;
1103
1104            multiplex_stream
1105                .install_protocol(&cap, move |conn| {
1106                    handler.into_connection(direction, remote_peer_id, conn)
1107                })
1108                .ok();
1109        }
1110
1111        let (multiplex_stream, their_status) =
1112            match multiplex_stream.into_eth_satellite_stream(status, fork_filter).await {
1113                Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1114                Err(err) => {
1115                    return PendingSessionEvent::Disconnected {
1116                        remote_addr,
1117                        session_id,
1118                        direction,
1119                        error: Some(PendingSessionHandshakeError::Eth(err)),
1120                    }
1121                }
1122            };
1123
1124        (multiplex_stream.into(), their_status)
1125    };
1126
1127    PendingSessionEvent::Established {
1128        session_id,
1129        remote_addr,
1130        local_addr,
1131        peer_id: their_hello.id,
1132        capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1133        status: Arc::new(their_status),
1134        conn,
1135        direction,
1136        client_id: their_hello.client_version,
1137    }
1138}