reth_network/session/handle.rs
1//! Session handles.
2
3use crate::{
4 message::PeerMessage,
5 session::{conn::EthRlpxConnection, Direction, SessionId},
6 PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10 capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
11 EthVersion, NetworkPrimitives, Status,
12};
13use reth_network_api::PeerInfo;
14use reth_network_peers::{NodeRecord, PeerId};
15use reth_network_types::PeerKind;
16use std::{io, net::SocketAddr, sync::Arc, time::Instant};
17use tokio::sync::{
18 mpsc::{self, error::SendError},
19 oneshot,
20};
21
22/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
23/// message which exchanges the `capabilities` of the peer.
24///
25/// This session needs to wait until it is authenticated.
26#[derive(Debug)]
27pub struct PendingSessionHandle {
28 /// Can be used to tell the session to disconnect the connection/abort the handshake process.
29 pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
30 /// The direction of the session
31 pub(crate) direction: Direction,
32}
33
34// === impl PendingSessionHandle ===
35
36impl PendingSessionHandle {
37 /// Sends a disconnect command to the pending session.
38 pub fn disconnect(&mut self) {
39 if let Some(tx) = self.disconnect_tx.take() {
40 let _ = tx.send(());
41 }
42 }
43
44 /// Returns the direction of the pending session (inbound or outbound).
45 pub const fn direction(&self) -> Direction {
46 self.direction
47 }
48}
49
50/// An established session with a remote peer.
51///
52/// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can
53/// be performed: chain synchronization, block propagation and transaction exchange.
54#[derive(Debug)]
55pub struct ActiveSessionHandle<N: NetworkPrimitives> {
56 /// The direction of the session
57 pub(crate) direction: Direction,
58 /// The assigned id for this session
59 pub(crate) session_id: SessionId,
60 /// negotiated eth version
61 pub(crate) version: EthVersion,
62 /// The identifier of the remote peer
63 pub(crate) remote_id: PeerId,
64 /// The timestamp when the session has been established.
65 pub(crate) established: Instant,
66 /// Announced capabilities of the peer.
67 pub(crate) capabilities: Arc<Capabilities>,
68 /// Sender half of the command channel used send commands _to_ the spawned session
69 pub(crate) commands_to_session: mpsc::Sender<SessionCommand<N>>,
70 /// The client's name and version
71 pub(crate) client_version: Arc<str>,
72 /// The address we're connected to
73 pub(crate) remote_addr: SocketAddr,
74 /// The local address of the connection.
75 pub(crate) local_addr: Option<SocketAddr>,
76 /// The Status message the peer sent for the `eth` handshake
77 pub(crate) status: Arc<Status>,
78}
79
80// === impl ActiveSessionHandle ===
81
82impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
83 /// Sends a disconnect command to the session.
84 pub fn disconnect(&self, reason: Option<DisconnectReason>) {
85 // Note: we clone the sender which ensures the channel has capacity to send the message
86 let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason });
87 }
88
89 /// Sends a disconnect command to the session, awaiting the command channel for available
90 /// capacity.
91 pub async fn try_disconnect(
92 &self,
93 reason: Option<DisconnectReason>,
94 ) -> Result<(), SendError<SessionCommand<N>>> {
95 self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await
96 }
97
98 /// Returns the direction of the active session (inbound or outbound).
99 pub const fn direction(&self) -> Direction {
100 self.direction
101 }
102
103 /// Returns the assigned session id for this session.
104 pub const fn session_id(&self) -> SessionId {
105 self.session_id
106 }
107
108 /// Returns the negotiated eth version for this session.
109 pub const fn version(&self) -> EthVersion {
110 self.version
111 }
112
113 /// Returns the identifier of the remote peer.
114 pub const fn remote_id(&self) -> PeerId {
115 self.remote_id
116 }
117
118 /// Returns the timestamp when the session has been established.
119 pub const fn established(&self) -> Instant {
120 self.established
121 }
122
123 /// Returns the announced capabilities of the peer.
124 pub fn capabilities(&self) -> Arc<Capabilities> {
125 self.capabilities.clone()
126 }
127
128 /// Returns the client's name and version.
129 pub fn client_version(&self) -> Arc<str> {
130 self.client_version.clone()
131 }
132
133 /// Returns the address we're connected to.
134 pub const fn remote_addr(&self) -> SocketAddr {
135 self.remote_addr
136 }
137
138 /// Extracts the [`PeerInfo`] from the session handle.
139 pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
140 PeerInfo {
141 remote_id: self.remote_id,
142 direction: self.direction,
143 enode: record.to_string(),
144 enr: None,
145 remote_addr: self.remote_addr,
146 local_addr: self.local_addr,
147 capabilities: self.capabilities.clone(),
148 client_version: self.client_version.clone(),
149 eth_version: self.version,
150 status: self.status.clone(),
151 session_established: self.established,
152 kind,
153 }
154 }
155}
156
157/// Events a pending session can produce.
158///
159/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
160///
161/// A session starts with a `Handshake`, followed by a `Hello` message which
162#[derive(Debug)]
163pub enum PendingSessionEvent<N: NetworkPrimitives> {
164 /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
165 Established {
166 /// An internal identifier for the established session
167 session_id: SessionId,
168 /// The remote node's socket address
169 remote_addr: SocketAddr,
170 /// The local address of the connection
171 local_addr: Option<SocketAddr>,
172 /// The remote node's public key
173 peer_id: PeerId,
174 /// All capabilities the peer announced
175 capabilities: Arc<Capabilities>,
176 /// The Status message the peer sent for the `eth` handshake
177 status: Arc<Status>,
178 /// The actual connection stream which can be used to send and receive `eth` protocol
179 /// messages
180 conn: EthRlpxConnection<N>,
181 /// The direction of the session, either `Inbound` or `Outgoing`
182 direction: Direction,
183 /// The remote node's user agent, usually containing the client name and version
184 client_id: String,
185 },
186 /// Handshake unsuccessful, session was disconnected.
187 Disconnected {
188 /// The remote node's socket address
189 remote_addr: SocketAddr,
190 /// The internal identifier for the disconnected session
191 session_id: SessionId,
192 /// The direction of the session, either `Inbound` or `Outgoing`
193 direction: Direction,
194 /// The error that caused the disconnect
195 error: Option<PendingSessionHandshakeError>,
196 },
197 /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
198 OutgoingConnectionError {
199 /// The remote node's socket address
200 remote_addr: SocketAddr,
201 /// The internal identifier for the disconnected session
202 session_id: SessionId,
203 /// The remote node's public key
204 peer_id: PeerId,
205 /// The error that caused the outgoing connection failure
206 error: io::Error,
207 },
208 /// Thrown when authentication via ECIES failed.
209 EciesAuthError {
210 /// The remote node's socket address
211 remote_addr: SocketAddr,
212 /// The internal identifier for the disconnected session
213 session_id: SessionId,
214 /// The error that caused the ECIES session to fail
215 error: ECIESError,
216 /// The direction of the session, either `Inbound` or `Outgoing`
217 direction: Direction,
218 },
219}
220
221/// Commands that can be sent to the spawned session.
222#[derive(Debug)]
223pub enum SessionCommand<N: NetworkPrimitives> {
224 /// Disconnect the connection
225 Disconnect {
226 /// Why the disconnect was initiated
227 reason: Option<DisconnectReason>,
228 },
229 /// Sends a message to the peer
230 Message(PeerMessage<N>),
231}
232
233/// Message variants an active session can produce and send back to the
234/// [`SessionManager`](crate::session::SessionManager)
235#[derive(Debug)]
236pub enum ActiveSessionMessage<N: NetworkPrimitives> {
237 /// Session was gracefully disconnected.
238 Disconnected {
239 /// The remote node's public key
240 peer_id: PeerId,
241 /// The remote node's socket address
242 remote_addr: SocketAddr,
243 },
244 /// Session was closed due an error
245 ClosedOnConnectionError {
246 /// The remote node's public key
247 peer_id: PeerId,
248 /// The remote node's socket address
249 remote_addr: SocketAddr,
250 /// The error that caused the session to close
251 error: EthStreamError,
252 },
253 /// A session received a valid message via `RLPx`.
254 ValidMessage {
255 /// Identifier of the remote peer.
256 peer_id: PeerId,
257 /// Message received from the peer.
258 message: PeerMessage<N>,
259 },
260 /// Received a message that does not match the announced capabilities of the peer.
261 InvalidMessage {
262 /// Identifier of the remote peer.
263 peer_id: PeerId,
264 /// Announced capabilities of the remote peer.
265 capabilities: Arc<Capabilities>,
266 /// Message received from the peer.
267 message: CapabilityMessage<N>,
268 },
269 /// Received a bad message from the peer.
270 BadMessage {
271 /// Identifier of the remote peer.
272 peer_id: PeerId,
273 },
274 /// Remote peer is considered in protocol violation
275 ProtocolBreach {
276 /// Identifier of the remote peer.
277 peer_id: PeerId,
278 },
279}