reth_network/session/
mod.rs

1//! Support for handling peer sessions.
2
3mod active;
4mod conn;
5mod counter;
6mod handle;
7
8use active::QueuedOutgoingMessages;
9pub use conn::EthRlpxConnection;
10pub use handle::{
11    ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
12    SessionCommand,
13};
14
15pub use reth_network_api::{Direction, PeerInfo};
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    net::SocketAddr,
21    sync::{atomic::AtomicU64, Arc},
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use crate::{
27    message::PeerMessage,
28    metrics::SessionManagerMetrics,
29    protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols},
30    session::active::ActiveSession,
31};
32use counter::SessionCounter;
33use futures::{future::Either, io, FutureExt, StreamExt};
34use reth_ecies::{stream::ECIESStream, ECIESError};
35use reth_eth_wire::{
36    capability::CapabilityMessage, errors::EthStreamError, multiplex::RlpxProtocolMultiplexer,
37    Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, NetworkPrimitives,
38    Status, UnauthedEthStream, UnauthedP2PStream,
39};
40use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
41use reth_metrics::common::mpsc::MeteredPollSender;
42use reth_network_api::{PeerRequest, PeerRequestSender};
43use reth_network_peers::PeerId;
44use reth_network_types::SessionsConfig;
45use reth_tasks::TaskSpawner;
46use rustc_hash::FxHashMap;
47use secp256k1::SecretKey;
48use tokio::{
49    io::{AsyncRead, AsyncWrite},
50    net::TcpStream,
51    sync::{mpsc, mpsc::error::TrySendError, oneshot},
52};
53use tokio_stream::wrappers::ReceiverStream;
54use tokio_util::sync::PollSender;
55use tracing::{debug, instrument, trace};
56
57/// Internal identifier for active sessions.
58#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
59pub struct SessionId(usize);
60
61/// Manages a set of sessions.
62#[must_use = "Session Manager must be polled to process session events."]
63#[derive(Debug)]
64pub struct SessionManager<N: NetworkPrimitives> {
65    /// Tracks the identifier for the next session.
66    next_id: usize,
67    /// Keeps track of all sessions
68    counter: SessionCounter,
69    ///  The maximum initial time an [`ActiveSession`] waits for a response from the peer before it
70    /// responds to an _internal_ request with a `TimeoutError`
71    initial_internal_request_timeout: Duration,
72    /// If an [`ActiveSession`] does not receive a response at all within this duration then it is
73    /// considered a protocol violation and the session will initiate a drop.
74    protocol_breach_request_timeout: Duration,
75    /// The timeout after which a pending session attempt is considered failed.
76    pending_session_timeout: Duration,
77    /// The secret key used for authenticating sessions.
78    secret_key: SecretKey,
79    /// The `Status` message to send to peers.
80    status: Status,
81    /// The `HelloMessage` message to send to peers.
82    hello_message: HelloMessageWithProtocols,
83    /// The [`ForkFilter`] used to validate the peer's `Status` message.
84    fork_filter: ForkFilter,
85    /// Size of the command buffer per session.
86    session_command_buffer: usize,
87    /// The executor for spawned tasks.
88    executor: Box<dyn TaskSpawner>,
89    /// All pending session that are currently handshaking, exchanging `Hello`s.
90    ///
91    /// Events produced during the authentication phase are reported to this manager. Once the
92    /// session is authenticated, it can be moved to the `active_session` set.
93    pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
94    /// All active sessions that are ready to exchange messages.
95    active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
96    /// The original Sender half of the [`PendingSessionEvent`] channel.
97    ///
98    /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
99    /// get a clone of this sender half.
100    pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
101    /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
102    pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
103    /// The original Sender half of the [`ActiveSessionMessage`] channel.
104    ///
105    /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
106    /// clone of this sender half.
107    active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
108    /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
109    active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
110    /// Additional `RLPx` sub-protocols to be used by the session manager.
111    extra_protocols: RlpxSubProtocols,
112    /// Tracks the ongoing graceful disconnections attempts for incoming connections.
113    disconnections_counter: DisconnectionsCounter,
114    /// Metrics for the session manager.
115    metrics: SessionManagerMetrics,
116}
117
118// === impl SessionManager ===
119
120impl<N: NetworkPrimitives> SessionManager<N> {
121    /// Creates a new empty [`SessionManager`].
122    #[allow(clippy::too_many_arguments)]
123    pub fn new(
124        secret_key: SecretKey,
125        config: SessionsConfig,
126        executor: Box<dyn TaskSpawner>,
127        status: Status,
128        hello_message: HelloMessageWithProtocols,
129        fork_filter: ForkFilter,
130        extra_protocols: RlpxSubProtocols,
131    ) -> Self {
132        let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
133        let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
134        let active_session_tx = PollSender::new(active_session_tx);
135
136        Self {
137            next_id: 0,
138            counter: SessionCounter::new(config.limits),
139            initial_internal_request_timeout: config.initial_internal_request_timeout,
140            protocol_breach_request_timeout: config.protocol_breach_request_timeout,
141            pending_session_timeout: config.pending_session_timeout,
142            secret_key,
143            status,
144            hello_message,
145            fork_filter,
146            session_command_buffer: config.session_command_buffer,
147            executor,
148            pending_sessions: Default::default(),
149            active_sessions: Default::default(),
150            pending_sessions_tx,
151            pending_session_rx: ReceiverStream::new(pending_sessions_rx),
152            active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
153            active_session_rx: ReceiverStream::new(active_session_rx),
154            extra_protocols,
155            disconnections_counter: Default::default(),
156            metrics: Default::default(),
157        }
158    }
159
160    /// Check whether the provided [`ForkId`] is compatible based on the validation rules in
161    /// `EIP-2124`.
162    pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
163        self.fork_filter.validate(fork_id).is_ok()
164    }
165
166    /// Returns the next unique [`SessionId`].
167    fn next_id(&mut self) -> SessionId {
168        let id = self.next_id;
169        self.next_id += 1;
170        SessionId(id)
171    }
172
173    /// Returns the current status of the session.
174    pub const fn status(&self) -> Status {
175        self.status
176    }
177
178    /// Returns the secret key used for authenticating sessions.
179    pub const fn secret_key(&self) -> SecretKey {
180        self.secret_key
181    }
182
183    /// Returns a borrowed reference to the active sessions.
184    pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
185        &self.active_sessions
186    }
187
188    /// Returns the session hello message.
189    pub fn hello_message(&self) -> HelloMessageWithProtocols {
190        self.hello_message.clone()
191    }
192
193    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
194    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
195        self.extra_protocols.push(protocol)
196    }
197
198    /// Returns the number of currently pending connections.
199    #[inline]
200    pub(crate) fn num_pending_connections(&self) -> usize {
201        self.pending_sessions.len()
202    }
203
204    /// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
205    /// [`JoinSet`](tokio::task::JoinSet).
206    fn spawn<F>(&self, f: F)
207    where
208        F: Future<Output = ()> + Send + 'static,
209    {
210        self.executor.spawn(f.boxed());
211    }
212
213    /// Invoked on a received status update.
214    ///
215    /// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
216    /// active [`ForkId`]. See also [`ForkFilter::set_head`].
217    pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
218        self.status.blockhash = head.hash;
219        self.status.total_difficulty = head.total_difficulty;
220        let transition = self.fork_filter.set_head(head);
221        self.status.forkid = self.fork_filter.current();
222        transition
223    }
224
225    /// An incoming TCP connection was received. This starts the authentication process to turn this
226    /// stream into an active peer session.
227    ///
228    /// Returns an error if the configured limit has been reached.
229    pub(crate) fn on_incoming(
230        &mut self,
231        stream: TcpStream,
232        remote_addr: SocketAddr,
233    ) -> Result<SessionId, ExceedsSessionLimit> {
234        self.counter.ensure_pending_inbound()?;
235
236        let session_id = self.next_id();
237
238        trace!(
239            target: "net::session",
240            ?remote_addr,
241            ?session_id,
242            "new pending incoming session"
243        );
244
245        let (disconnect_tx, disconnect_rx) = oneshot::channel();
246        let pending_events = self.pending_sessions_tx.clone();
247        let secret_key = self.secret_key;
248        let hello_message = self.hello_message.clone();
249        let status = self.status;
250        let fork_filter = self.fork_filter.clone();
251        let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
252        self.spawn(pending_session_with_timeout(
253            self.pending_session_timeout,
254            session_id,
255            remote_addr,
256            Direction::Incoming,
257            pending_events.clone(),
258            start_pending_incoming_session(
259                disconnect_rx,
260                session_id,
261                stream,
262                pending_events,
263                remote_addr,
264                secret_key,
265                hello_message,
266                status,
267                fork_filter,
268                extra_handlers,
269            ),
270        ));
271
272        let handle = PendingSessionHandle {
273            disconnect_tx: Some(disconnect_tx),
274            direction: Direction::Incoming,
275        };
276        self.pending_sessions.insert(session_id, handle);
277        self.counter.inc_pending_inbound();
278        Ok(session_id)
279    }
280
281    /// Starts a new pending session from the local node to the given remote node.
282    pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
283        // The error can be dropped because no dial will be made if it would exceed the limit
284        if self.counter.ensure_pending_outbound().is_ok() {
285            let session_id = self.next_id();
286            let (disconnect_tx, disconnect_rx) = oneshot::channel();
287            let pending_events = self.pending_sessions_tx.clone();
288            let secret_key = self.secret_key;
289            let hello_message = self.hello_message.clone();
290            let fork_filter = self.fork_filter.clone();
291            let status = self.status;
292            let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
293            self.spawn(pending_session_with_timeout(
294                self.pending_session_timeout,
295                session_id,
296                remote_addr,
297                Direction::Outgoing(remote_peer_id),
298                pending_events.clone(),
299                start_pending_outbound_session(
300                    disconnect_rx,
301                    pending_events,
302                    session_id,
303                    remote_addr,
304                    remote_peer_id,
305                    secret_key,
306                    hello_message,
307                    status,
308                    fork_filter,
309                    extra_handlers,
310                ),
311            ));
312
313            let handle = PendingSessionHandle {
314                disconnect_tx: Some(disconnect_tx),
315                direction: Direction::Outgoing(remote_peer_id),
316            };
317            self.pending_sessions.insert(session_id, handle);
318            self.counter.inc_pending_outbound();
319        }
320    }
321
322    /// Initiates a shutdown of the channel.
323    ///
324    /// This will trigger the disconnect on the session task to gracefully terminate. The result
325    /// will be picked up by the receiver.
326    pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
327        if let Some(session) = self.active_sessions.get(&node) {
328            session.disconnect(reason);
329        }
330    }
331
332    /// Initiates a shutdown of all sessions.
333    ///
334    /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
335    /// will be picked by the receiver.
336    pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
337        for session in self.active_sessions.values() {
338            session.disconnect(reason);
339        }
340    }
341
342    /// Disconnects all pending sessions.
343    pub fn disconnect_all_pending(&mut self) {
344        for session in self.pending_sessions.values_mut() {
345            session.disconnect();
346        }
347    }
348
349    /// Sends a message to the peer's session
350    pub fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage<N>) {
351        if let Some(session) = self.active_sessions.get_mut(peer_id) {
352            let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
353                |e| {
354                    if let TrySendError::Full(_) = e {
355                        debug!(
356                            target: "net::session",
357                            ?peer_id,
358                            "session command buffer full, dropping message"
359                        );
360                        self.metrics.total_outgoing_peer_messages_dropped.increment(1);
361                    }
362                },
363            );
364        }
365    }
366
367    /// Removes the [`PendingSessionHandle`] if it exists.
368    fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
369        let session = self.pending_sessions.remove(id)?;
370        self.counter.dec_pending(&session.direction);
371        Some(session)
372    }
373
374    /// Removes the [`PendingSessionHandle`] if it exists.
375    fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
376        let session = self.active_sessions.remove(id)?;
377        self.counter.dec_active(&session.direction);
378        Some(session)
379    }
380
381    /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
382    /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
383    /// simply drop the incoming connection.
384    pub(crate) fn try_disconnect_incoming_connection(
385        &self,
386        stream: TcpStream,
387        reason: DisconnectReason,
388    ) {
389        if !self.disconnections_counter.has_capacity() {
390            // drop the connection if we don't have capacity for gracefully disconnecting
391            return
392        }
393
394        let guard = self.disconnections_counter.clone();
395        let secret_key = self.secret_key;
396
397        self.spawn(async move {
398            trace!(
399                target: "net::session",
400                "gracefully disconnecting incoming connection"
401            );
402            if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
403                let mut unauth = UnauthedP2PStream::new(stream);
404                let _ = unauth.send_disconnect(reason).await;
405                drop(guard);
406            }
407        });
408    }
409
410    /// This polls all the session handles and returns [`SessionEvent`].
411    ///
412    /// Active sessions are prioritized.
413    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
414        // Poll events from active sessions
415        match self.active_session_rx.poll_next_unpin(cx) {
416            Poll::Pending => {}
417            Poll::Ready(None) => {
418                unreachable!("Manager holds both channel halves.")
419            }
420            Poll::Ready(Some(event)) => {
421                return match event {
422                    ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
423                        trace!(
424                            target: "net::session",
425                            ?peer_id,
426                            "gracefully disconnected active session."
427                        );
428                        self.remove_active_session(&peer_id);
429                        Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
430                    }
431                    ActiveSessionMessage::ClosedOnConnectionError {
432                        peer_id,
433                        remote_addr,
434                        error,
435                    } => {
436                        trace!(target: "net::session", ?peer_id, %error,"closed session.");
437                        self.remove_active_session(&peer_id);
438                        Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
439                            remote_addr,
440                            peer_id,
441                            error,
442                        })
443                    }
444                    ActiveSessionMessage::ValidMessage { peer_id, message } => {
445                        Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
446                    }
447                    ActiveSessionMessage::InvalidMessage { peer_id, capabilities, message } => {
448                        Poll::Ready(SessionEvent::InvalidMessage { peer_id, message, capabilities })
449                    }
450                    ActiveSessionMessage::BadMessage { peer_id } => {
451                        Poll::Ready(SessionEvent::BadMessage { peer_id })
452                    }
453                    ActiveSessionMessage::ProtocolBreach { peer_id } => {
454                        Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
455                    }
456                }
457            }
458        }
459
460        // Poll the pending session event stream
461        let event = match self.pending_session_rx.poll_next_unpin(cx) {
462            Poll::Pending => return Poll::Pending,
463            Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
464            Poll::Ready(Some(event)) => event,
465        };
466        match event {
467            PendingSessionEvent::Established {
468                session_id,
469                remote_addr,
470                local_addr,
471                peer_id,
472                capabilities,
473                conn,
474                status,
475                direction,
476                client_id,
477            } => {
478                // move from pending to established.
479                self.remove_pending_session(&session_id);
480
481                // If there's already a session to the peer then we disconnect right away
482                if self.active_sessions.contains_key(&peer_id) {
483                    trace!(
484                        target: "net::session",
485                        ?session_id,
486                        ?remote_addr,
487                        ?peer_id,
488                        ?direction,
489                        "already connected"
490                    );
491
492                    self.spawn(async move {
493                        // send a disconnect message
494                        let _ =
495                            conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
496                    });
497
498                    return Poll::Ready(SessionEvent::AlreadyConnected {
499                        peer_id,
500                        remote_addr,
501                        direction,
502                    })
503                }
504
505                let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
506
507                let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
508
509                let messages = PeerRequestSender::new(peer_id, to_session_tx);
510
511                let timeout = Arc::new(AtomicU64::new(
512                    self.initial_internal_request_timeout.as_millis() as u64,
513                ));
514
515                // negotiated version
516                let version = conn.version();
517
518                let session = ActiveSession {
519                    next_id: 0,
520                    remote_peer_id: peer_id,
521                    remote_addr,
522                    remote_capabilities: Arc::clone(&capabilities),
523                    session_id,
524                    commands_rx: ReceiverStream::new(commands_rx),
525                    to_session_manager: self.active_session_tx.clone(),
526                    pending_message_to_session: None,
527                    internal_request_tx: ReceiverStream::new(messages_rx).fuse(),
528                    inflight_requests: Default::default(),
529                    conn,
530                    queued_outgoing: QueuedOutgoingMessages::new(
531                        self.metrics.queued_outgoing_messages.clone(),
532                    ),
533                    received_requests_from_remote: Default::default(),
534                    internal_request_timeout_interval: tokio::time::interval(
535                        self.initial_internal_request_timeout,
536                    ),
537                    internal_request_timeout: Arc::clone(&timeout),
538                    protocol_breach_request_timeout: self.protocol_breach_request_timeout,
539                    terminate_message: None,
540                };
541
542                self.spawn(session);
543
544                let client_version = client_id.into();
545                let handle = ActiveSessionHandle {
546                    status: status.clone(),
547                    direction,
548                    session_id,
549                    remote_id: peer_id,
550                    version,
551                    established: Instant::now(),
552                    capabilities: Arc::clone(&capabilities),
553                    commands_to_session,
554                    client_version: Arc::clone(&client_version),
555                    remote_addr,
556                    local_addr,
557                };
558
559                self.active_sessions.insert(peer_id, handle);
560                self.counter.inc_active(&direction);
561
562                if direction.is_outgoing() {
563                    self.metrics.total_dial_successes.increment(1);
564                }
565
566                Poll::Ready(SessionEvent::SessionEstablished {
567                    peer_id,
568                    remote_addr,
569                    client_version,
570                    version,
571                    capabilities,
572                    status,
573                    messages,
574                    direction,
575                    timeout,
576                })
577            }
578            PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
579                trace!(
580                    target: "net::session",
581                    ?session_id,
582                    ?remote_addr,
583                    ?error,
584                    "disconnected pending session"
585                );
586                self.remove_pending_session(&session_id);
587                match direction {
588                    Direction::Incoming => {
589                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
590                            remote_addr,
591                            error,
592                        })
593                    }
594                    Direction::Outgoing(peer_id) => {
595                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
596                            remote_addr,
597                            peer_id,
598                            error,
599                        })
600                    }
601                }
602            }
603            PendingSessionEvent::OutgoingConnectionError {
604                remote_addr,
605                session_id,
606                peer_id,
607                error,
608            } => {
609                trace!(
610                    target: "net::session",
611                    %error,
612                    ?session_id,
613                    ?remote_addr,
614                    ?peer_id,
615                    "connection refused"
616                );
617                self.remove_pending_session(&session_id);
618                Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
619            }
620            PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
621                trace!(
622                    target: "net::session",
623                    %error,
624                    ?session_id,
625                    ?remote_addr,
626                    "ecies auth failed"
627                );
628                self.remove_pending_session(&session_id);
629                match direction {
630                    Direction::Incoming => {
631                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
632                            remote_addr,
633                            error: Some(PendingSessionHandshakeError::Ecies(error)),
634                        })
635                    }
636                    Direction::Outgoing(peer_id) => {
637                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
638                            remote_addr,
639                            peer_id,
640                            error: Some(PendingSessionHandshakeError::Ecies(error)),
641                        })
642                    }
643                }
644            }
645        }
646    }
647}
648
649/// A counter for ongoing graceful disconnections attempts.
650#[derive(Default, Debug, Clone)]
651struct DisconnectionsCounter(Arc<()>);
652
653impl DisconnectionsCounter {
654    const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
655
656    /// Returns true if the [`DisconnectionsCounter`] still has capacity
657    /// for an additional graceful disconnection.
658    fn has_capacity(&self) -> bool {
659        Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
660    }
661}
662
663/// Events produced by the [`SessionManager`]
664#[derive(Debug)]
665pub enum SessionEvent<N: NetworkPrimitives> {
666    /// A new session was successfully authenticated.
667    ///
668    /// This session is now able to exchange data.
669    SessionEstablished {
670        /// The remote node's public key
671        peer_id: PeerId,
672        /// The remote node's socket address
673        remote_addr: SocketAddr,
674        /// The user agent of the remote node, usually containing the client name and version
675        client_version: Arc<str>,
676        /// The capabilities the remote node has announced
677        capabilities: Arc<Capabilities>,
678        /// negotiated eth version
679        version: EthVersion,
680        /// The Status message the peer sent during the `eth` handshake
681        status: Arc<Status>,
682        /// The channel for sending messages to the peer with the session
683        messages: PeerRequestSender<PeerRequest<N>>,
684        /// The direction of the session, either `Inbound` or `Outgoing`
685        direction: Direction,
686        /// The maximum time that the session waits for a response from the peer before timing out
687        /// the connection
688        timeout: Arc<AtomicU64>,
689    },
690    /// The peer was already connected with another session.
691    AlreadyConnected {
692        /// The remote node's public key
693        peer_id: PeerId,
694        /// The remote node's socket address
695        remote_addr: SocketAddr,
696        /// The direction of the session, either `Inbound` or `Outgoing`
697        direction: Direction,
698    },
699    /// A session received a valid message via `RLPx`.
700    ValidMessage {
701        /// The remote node's public key
702        peer_id: PeerId,
703        /// Message received from the peer.
704        message: PeerMessage<N>,
705    },
706    /// Received a message that does not match the announced capabilities of the peer.
707    InvalidMessage {
708        /// The remote node's public key
709        peer_id: PeerId,
710        /// Announced capabilities of the remote peer.
711        capabilities: Arc<Capabilities>,
712        /// Message received from the peer.
713        message: CapabilityMessage<N>,
714    },
715    /// Received a bad message from the peer.
716    BadMessage {
717        /// Identifier of the remote peer.
718        peer_id: PeerId,
719    },
720    /// Remote peer is considered in protocol violation
721    ProtocolBreach {
722        /// Identifier of the remote peer.
723        peer_id: PeerId,
724    },
725    /// Closed an incoming pending session during handshaking.
726    IncomingPendingSessionClosed {
727        /// The remote node's socket address
728        remote_addr: SocketAddr,
729        /// The pending handshake session error that caused the session to close
730        error: Option<PendingSessionHandshakeError>,
731    },
732    /// Closed an outgoing pending session during handshaking.
733    OutgoingPendingSessionClosed {
734        /// The remote node's socket address
735        remote_addr: SocketAddr,
736        /// The remote node's public key
737        peer_id: PeerId,
738        /// The pending handshake session error that caused the session to close
739        error: Option<PendingSessionHandshakeError>,
740    },
741    /// Failed to establish a tcp stream
742    OutgoingConnectionError {
743        /// The remote node's socket address
744        remote_addr: SocketAddr,
745        /// The remote node's public key
746        peer_id: PeerId,
747        /// The error that caused the outgoing connection to fail
748        error: io::Error,
749    },
750    /// Session was closed due to an error
751    SessionClosedOnConnectionError {
752        /// The id of the remote peer.
753        peer_id: PeerId,
754        /// The socket we were connected to.
755        remote_addr: SocketAddr,
756        /// The error that caused the session to close
757        error: EthStreamError,
758    },
759    /// Active session was gracefully disconnected.
760    Disconnected {
761        /// The remote node's public key
762        peer_id: PeerId,
763        /// The remote node's socket address that we were connected to
764        remote_addr: SocketAddr,
765    },
766}
767
768/// Errors that can occur during handshaking/authenticating the underlying streams.
769#[derive(Debug, thiserror::Error)]
770pub enum PendingSessionHandshakeError {
771    /// The pending session failed due to an error while establishing the `eth` stream
772    #[error(transparent)]
773    Eth(EthStreamError),
774    /// The pending session failed due to an error while establishing the ECIES stream
775    #[error(transparent)]
776    Ecies(ECIESError),
777    /// Thrown when the authentication timed out
778    #[error("authentication timed out")]
779    Timeout,
780}
781
782impl PendingSessionHandshakeError {
783    /// Returns the [`DisconnectReason`] if the error is a disconnect message
784    pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
785        match self {
786            Self::Eth(eth_err) => eth_err.as_disconnected(),
787            _ => None,
788        }
789    }
790}
791
792/// The error thrown when the max configured limit has been reached and no more connections are
793/// accepted.
794#[derive(Debug, Clone, thiserror::Error)]
795#[error("session limit reached {0}")]
796pub struct ExceedsSessionLimit(pub(crate) u32);
797
798/// Starts a pending session authentication with a timeout.
799pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
800    timeout: Duration,
801    session_id: SessionId,
802    remote_addr: SocketAddr,
803    direction: Direction,
804    events: mpsc::Sender<PendingSessionEvent<N>>,
805    f: F,
806) where
807    F: Future<Output = ()>,
808{
809    if tokio::time::timeout(timeout, f).await.is_err() {
810        trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
811        let event = PendingSessionEvent::Disconnected {
812            remote_addr,
813            session_id,
814            direction,
815            error: Some(PendingSessionHandshakeError::Timeout),
816        };
817        let _ = events.send(event).await;
818    }
819}
820
821/// Starts the authentication process for a connection initiated by a remote peer.
822///
823/// This will wait for the _incoming_ handshake request and answer it.
824#[allow(clippy::too_many_arguments)]
825pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
826    disconnect_rx: oneshot::Receiver<()>,
827    session_id: SessionId,
828    stream: TcpStream,
829    events: mpsc::Sender<PendingSessionEvent<N>>,
830    remote_addr: SocketAddr,
831    secret_key: SecretKey,
832    hello: HelloMessageWithProtocols,
833    status: Status,
834    fork_filter: ForkFilter,
835    extra_handlers: RlpxSubProtocolHandlers,
836) {
837    authenticate(
838        disconnect_rx,
839        events,
840        stream,
841        session_id,
842        remote_addr,
843        secret_key,
844        Direction::Incoming,
845        hello,
846        status,
847        fork_filter,
848        extra_handlers,
849    )
850    .await
851}
852
853/// Starts the authentication process for a connection initiated by a remote peer.
854#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
855#[allow(clippy::too_many_arguments)]
856async fn start_pending_outbound_session<N: NetworkPrimitives>(
857    disconnect_rx: oneshot::Receiver<()>,
858    events: mpsc::Sender<PendingSessionEvent<N>>,
859    session_id: SessionId,
860    remote_addr: SocketAddr,
861    remote_peer_id: PeerId,
862    secret_key: SecretKey,
863    hello: HelloMessageWithProtocols,
864    status: Status,
865    fork_filter: ForkFilter,
866    extra_handlers: RlpxSubProtocolHandlers,
867) {
868    let stream = match TcpStream::connect(remote_addr).await {
869        Ok(stream) => {
870            if let Err(err) = stream.set_nodelay(true) {
871                tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
872            }
873            stream
874        }
875        Err(error) => {
876            let _ = events
877                .send(PendingSessionEvent::OutgoingConnectionError {
878                    remote_addr,
879                    session_id,
880                    peer_id: remote_peer_id,
881                    error,
882                })
883                .await;
884            return
885        }
886    };
887    authenticate(
888        disconnect_rx,
889        events,
890        stream,
891        session_id,
892        remote_addr,
893        secret_key,
894        Direction::Outgoing(remote_peer_id),
895        hello,
896        status,
897        fork_filter,
898        extra_handlers,
899    )
900    .await
901}
902
903/// Authenticates a session
904#[allow(clippy::too_many_arguments)]
905async fn authenticate<N: NetworkPrimitives>(
906    disconnect_rx: oneshot::Receiver<()>,
907    events: mpsc::Sender<PendingSessionEvent<N>>,
908    stream: TcpStream,
909    session_id: SessionId,
910    remote_addr: SocketAddr,
911    secret_key: SecretKey,
912    direction: Direction,
913    hello: HelloMessageWithProtocols,
914    status: Status,
915    fork_filter: ForkFilter,
916    extra_handlers: RlpxSubProtocolHandlers,
917) {
918    let local_addr = stream.local_addr().ok();
919    let stream = match get_ecies_stream(stream, secret_key, direction).await {
920        Ok(stream) => stream,
921        Err(error) => {
922            let _ = events
923                .send(PendingSessionEvent::EciesAuthError {
924                    remote_addr,
925                    session_id,
926                    error,
927                    direction,
928                })
929                .await;
930            return
931        }
932    };
933
934    let unauthed = UnauthedP2PStream::new(stream);
935
936    let auth = authenticate_stream(
937        unauthed,
938        session_id,
939        remote_addr,
940        local_addr,
941        direction,
942        hello,
943        status,
944        fork_filter,
945        extra_handlers,
946    )
947    .boxed();
948
949    match futures::future::select(disconnect_rx, auth).await {
950        Either::Left((_, _)) => {
951            let _ = events
952                .send(PendingSessionEvent::Disconnected {
953                    remote_addr,
954                    session_id,
955                    direction,
956                    error: None,
957                })
958                .await;
959        }
960        Either::Right((res, _)) => {
961            let _ = events.send(res).await;
962        }
963    }
964}
965
966/// Returns an [`ECIESStream`] if it can be built. If not, send a
967/// [`PendingSessionEvent::EciesAuthError`] and returns `None`
968async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
969    stream: Io,
970    secret_key: SecretKey,
971    direction: Direction,
972) -> Result<ECIESStream<Io>, ECIESError> {
973    match direction {
974        Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
975        Direction::Outgoing(remote_peer_id) => {
976            ECIESStream::connect(stream, secret_key, remote_peer_id).await
977        }
978    }
979}
980
981/// Authenticate the stream via handshake
982///
983/// On Success return the authenticated stream as [`PendingSessionEvent`].
984///
985/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
986/// also negotiate the additional protocols.
987#[allow(clippy::too_many_arguments)]
988async fn authenticate_stream<N: NetworkPrimitives>(
989    stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
990    session_id: SessionId,
991    remote_addr: SocketAddr,
992    local_addr: Option<SocketAddr>,
993    direction: Direction,
994    mut hello: HelloMessageWithProtocols,
995    mut status: Status,
996    fork_filter: ForkFilter,
997    mut extra_handlers: RlpxSubProtocolHandlers,
998) -> PendingSessionEvent<N> {
999    // Add extra protocols to the hello message
1000    extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1001
1002    // conduct the p2p handshake and return the authenticated stream
1003    let (p2p_stream, their_hello) = match stream.handshake(hello).await {
1004        Ok(stream_res) => stream_res,
1005        Err(err) => {
1006            return PendingSessionEvent::Disconnected {
1007                remote_addr,
1008                session_id,
1009                direction,
1010                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1011            }
1012        }
1013    };
1014
1015    // Ensure we negotiated mandatory eth protocol
1016    let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1017        Ok(version) => version,
1018        Err(err) => {
1019            return PendingSessionEvent::Disconnected {
1020                remote_addr,
1021                session_id,
1022                direction,
1023                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1024            }
1025        }
1026    };
1027
1028    let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1029        // if the hello handshake was successful we can try status handshake
1030        //
1031        // Before trying status handshake, set up the version to negotiated shared version
1032        status.set_eth_version(eth_version);
1033        let eth_unauthed = UnauthedEthStream::new(p2p_stream);
1034        let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
1035            Ok(stream_res) => stream_res,
1036            Err(err) => {
1037                return PendingSessionEvent::Disconnected {
1038                    remote_addr,
1039                    session_id,
1040                    direction,
1041                    error: Some(PendingSessionHandshakeError::Eth(err)),
1042                }
1043            }
1044        };
1045        (eth_stream.into(), their_status)
1046    } else {
1047        // Multiplex the stream with the extra protocols
1048        let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1049
1050        // install additional handlers
1051        for handler in extra_handlers.into_iter() {
1052            let cap = handler.protocol().cap;
1053            let remote_peer_id = their_hello.id;
1054            multiplex_stream
1055                .install_protocol(&cap, move |conn| {
1056                    handler.into_connection(direction, remote_peer_id, conn)
1057                })
1058                .ok();
1059        }
1060
1061        let (multiplex_stream, their_status) =
1062            match multiplex_stream.into_eth_satellite_stream(status, fork_filter).await {
1063                Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1064                Err(err) => {
1065                    return PendingSessionEvent::Disconnected {
1066                        remote_addr,
1067                        session_id,
1068                        direction,
1069                        error: Some(PendingSessionHandshakeError::Eth(err)),
1070                    }
1071                }
1072            };
1073
1074        (multiplex_stream.into(), their_status)
1075    };
1076
1077    PendingSessionEvent::Established {
1078        session_id,
1079        remote_addr,
1080        local_addr,
1081        peer_id: their_hello.id,
1082        capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1083        status: Arc::new(their_status),
1084        conn,
1085        direction,
1086        client_id: their_hello.client_version,
1087    }
1088}