reth_network/session/
handle.rs

1//! Session handles.
2
3use crate::{
4    message::PeerMessage,
5    session::{conn::EthRlpxConnection, Direction, SessionId},
6    PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10    capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
11    EthVersion, NetworkPrimitives, Status,
12};
13use reth_network_api::PeerInfo;
14use reth_network_peers::{NodeRecord, PeerId};
15use reth_network_types::PeerKind;
16use std::{io, net::SocketAddr, sync::Arc, time::Instant};
17use tokio::sync::{
18    mpsc::{self, error::SendError},
19    oneshot,
20};
21
22/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
23/// message which exchanges the `capabilities` of the peer.
24///
25/// This session needs to wait until it is authenticated.
26#[derive(Debug)]
27pub struct PendingSessionHandle {
28    /// Can be used to tell the session to disconnect the connection/abort the handshake process.
29    pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
30    /// The direction of the session
31    pub(crate) direction: Direction,
32}
33
34// === impl PendingSessionHandle ===
35
36impl PendingSessionHandle {
37    /// Sends a disconnect command to the pending session.
38    pub fn disconnect(&mut self) {
39        if let Some(tx) = self.disconnect_tx.take() {
40            let _ = tx.send(());
41        }
42    }
43
44    /// Returns the direction of the pending session (inbound or outbound).
45    pub const fn direction(&self) -> Direction {
46        self.direction
47    }
48}
49
50/// An established session with a remote peer.
51///
52/// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can
53/// be performed: chain synchronization, block propagation and transaction exchange.
54#[derive(Debug)]
55pub struct ActiveSessionHandle<N: NetworkPrimitives> {
56    /// The direction of the session
57    pub(crate) direction: Direction,
58    /// The assigned id for this session
59    pub(crate) session_id: SessionId,
60    /// negotiated eth version
61    pub(crate) version: EthVersion,
62    /// The identifier of the remote peer
63    pub(crate) remote_id: PeerId,
64    /// The timestamp when the session has been established.
65    pub(crate) established: Instant,
66    /// Announced capabilities of the peer.
67    pub(crate) capabilities: Arc<Capabilities>,
68    /// Sender half of the command channel used send commands _to_ the spawned session
69    pub(crate) commands_to_session: mpsc::Sender<SessionCommand<N>>,
70    /// The client's name and version
71    pub(crate) client_version: Arc<str>,
72    /// The address we're connected to
73    pub(crate) remote_addr: SocketAddr,
74    /// The local address of the connection.
75    pub(crate) local_addr: Option<SocketAddr>,
76    /// The Status message the peer sent for the `eth` handshake
77    pub(crate) status: Arc<Status>,
78}
79
80// === impl ActiveSessionHandle ===
81
82impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
83    /// Sends a disconnect command to the session.
84    pub fn disconnect(&self, reason: Option<DisconnectReason>) {
85        // Note: we clone the sender which ensures the channel has capacity to send the message
86        let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason });
87    }
88
89    /// Sends a disconnect command to the session, awaiting the command channel for available
90    /// capacity.
91    pub async fn try_disconnect(
92        &self,
93        reason: Option<DisconnectReason>,
94    ) -> Result<(), SendError<SessionCommand<N>>> {
95        self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await
96    }
97
98    /// Returns the direction of the active session (inbound or outbound).
99    pub const fn direction(&self) -> Direction {
100        self.direction
101    }
102
103    /// Returns the assigned session id for this session.
104    pub const fn session_id(&self) -> SessionId {
105        self.session_id
106    }
107
108    /// Returns the negotiated eth version for this session.
109    pub const fn version(&self) -> EthVersion {
110        self.version
111    }
112
113    /// Returns the identifier of the remote peer.
114    pub const fn remote_id(&self) -> PeerId {
115        self.remote_id
116    }
117
118    /// Returns the timestamp when the session has been established.
119    pub const fn established(&self) -> Instant {
120        self.established
121    }
122
123    /// Returns the announced capabilities of the peer.
124    pub fn capabilities(&self) -> Arc<Capabilities> {
125        self.capabilities.clone()
126    }
127
128    /// Returns the client's name and version.
129    pub fn client_version(&self) -> Arc<str> {
130        self.client_version.clone()
131    }
132
133    /// Returns the address we're connected to.
134    pub const fn remote_addr(&self) -> SocketAddr {
135        self.remote_addr
136    }
137
138    /// Extracts the [`PeerInfo`] from the session handle.
139    pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
140        PeerInfo {
141            remote_id: self.remote_id,
142            direction: self.direction,
143            enode: record.to_string(),
144            enr: None,
145            remote_addr: self.remote_addr,
146            local_addr: self.local_addr,
147            capabilities: self.capabilities.clone(),
148            client_version: self.client_version.clone(),
149            eth_version: self.version,
150            status: self.status.clone(),
151            session_established: self.established,
152            kind,
153        }
154    }
155}
156
157/// Events a pending session can produce.
158///
159/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
160///
161/// A session starts with a `Handshake`, followed by a `Hello` message which
162#[derive(Debug)]
163pub enum PendingSessionEvent<N: NetworkPrimitives> {
164    /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
165    Established {
166        /// An internal identifier for the established session
167        session_id: SessionId,
168        /// The remote node's socket address
169        remote_addr: SocketAddr,
170        /// The local address of the connection
171        local_addr: Option<SocketAddr>,
172        /// The remote node's public key
173        peer_id: PeerId,
174        /// All capabilities the peer announced
175        capabilities: Arc<Capabilities>,
176        /// The Status message the peer sent for the `eth` handshake
177        status: Arc<Status>,
178        /// The actual connection stream which can be used to send and receive `eth` protocol
179        /// messages
180        conn: EthRlpxConnection<N>,
181        /// The direction of the session, either `Inbound` or `Outgoing`
182        direction: Direction,
183        /// The remote node's user agent, usually containing the client name and version
184        client_id: String,
185    },
186    /// Handshake unsuccessful, session was disconnected.
187    Disconnected {
188        /// The remote node's socket address
189        remote_addr: SocketAddr,
190        /// The internal identifier for the disconnected session
191        session_id: SessionId,
192        /// The direction of the session, either `Inbound` or `Outgoing`
193        direction: Direction,
194        /// The error that caused the disconnect
195        error: Option<PendingSessionHandshakeError>,
196    },
197    /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
198    OutgoingConnectionError {
199        /// The remote node's socket address
200        remote_addr: SocketAddr,
201        /// The internal identifier for the disconnected session
202        session_id: SessionId,
203        /// The remote node's public key
204        peer_id: PeerId,
205        /// The error that caused the outgoing connection failure
206        error: io::Error,
207    },
208    /// Thrown when authentication via ECIES failed.
209    EciesAuthError {
210        /// The remote node's socket address
211        remote_addr: SocketAddr,
212        /// The internal identifier for the disconnected session
213        session_id: SessionId,
214        /// The error that caused the ECIES session to fail
215        error: ECIESError,
216        /// The direction of the session, either `Inbound` or `Outgoing`
217        direction: Direction,
218    },
219}
220
221/// Commands that can be sent to the spawned session.
222#[derive(Debug)]
223pub enum SessionCommand<N: NetworkPrimitives> {
224    /// Disconnect the connection
225    Disconnect {
226        /// Why the disconnect was initiated
227        reason: Option<DisconnectReason>,
228    },
229    /// Sends a message to the peer
230    Message(PeerMessage<N>),
231}
232
233/// Message variants an active session can produce and send back to the
234/// [`SessionManager`](crate::session::SessionManager)
235#[derive(Debug)]
236pub enum ActiveSessionMessage<N: NetworkPrimitives> {
237    /// Session was gracefully disconnected.
238    Disconnected {
239        /// The remote node's public key
240        peer_id: PeerId,
241        /// The remote node's socket address
242        remote_addr: SocketAddr,
243    },
244    /// Session was closed due an error
245    ClosedOnConnectionError {
246        /// The remote node's public key
247        peer_id: PeerId,
248        /// The remote node's socket address
249        remote_addr: SocketAddr,
250        /// The error that caused the session to close
251        error: EthStreamError,
252    },
253    /// A session received a valid message via `RLPx`.
254    ValidMessage {
255        /// Identifier of the remote peer.
256        peer_id: PeerId,
257        /// Message received from the peer.
258        message: PeerMessage<N>,
259    },
260    /// Received a message that does not match the announced capabilities of the peer.
261    InvalidMessage {
262        /// Identifier of the remote peer.
263        peer_id: PeerId,
264        /// Announced capabilities of the remote peer.
265        capabilities: Arc<Capabilities>,
266        /// Message received from the peer.
267        message: CapabilityMessage<N>,
268    },
269    /// Received a bad message from the peer.
270    BadMessage {
271        /// Identifier of the remote peer.
272        peer_id: PeerId,
273    },
274    /// Remote peer is considered in protocol violation
275    ProtocolBreach {
276        /// Identifier of the remote peer.
277        peer_id: PeerId,
278    },
279}