reth_network/
swarm.rs

1use crate::{
2    listener::{ConnectionListener, ListenerEvent},
3    message::PeerMessage,
4    peers::InboundConnectionError,
5    protocol::IntoRlpxSubProtocol,
6    session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7    state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11    capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
12    EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17    io,
18    net::SocketAddr,
19    pin::Pin,
20    sync::Arc,
21    task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26/// Contains the connectivity related state of the network.
27///
28/// A swarm emits [`SwarmEvent`]s when polled.
29///
30/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
31/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
32/// [`NetworkState`] and also delegated to the [`NetworkState`].
33///
34/// Following diagram displays the dataflow contained in the [`Swarm`]
35///
36/// The [`ConnectionListener`] yields incoming [`TcpStream`]s from peers that are spawned as session
37/// tasks. After a successful `RLPx` authentication, the task is ready to accept ETH requests or
38/// broadcast messages. A task listens for messages from the [`SessionManager`] which include
39/// broadcast messages like `Transactions` or internal commands, for example to disconnect the
40/// session.
41///
42/// The [`NetworkState`] keeps track of all connected and discovered peers and can initiate outgoing
43/// connections. For each active session, the [`NetworkState`] keeps a sender half of the ETH
44/// request channel for the created session and sends requests it receives from the
45/// [`StateFetcher`], which receives request objects from the client interfaces responsible for
46/// downloading headers and bodies.
47///
48/// `include_mmd!("docs/mermaid/swarm.mmd`")
49#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52    /// Listens for new incoming connections.
53    incoming: ConnectionListener,
54    /// All sessions.
55    sessions: SessionManager<N>,
56    /// Tracks the entire state of the network and handles events received from the sessions.
57    state: NetworkState<N>,
58}
59
60// === impl Swarm ===
61
62impl<N: NetworkPrimitives> Swarm<N> {
63    /// Configures a new swarm instance.
64    pub(crate) const fn new(
65        incoming: ConnectionListener,
66        sessions: SessionManager<N>,
67        state: NetworkState<N>,
68    ) -> Self {
69        Self { incoming, sessions, state }
70    }
71
72    /// Adds a protocol handler to the `RLPx` sub-protocol list.
73    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74        self.sessions_mut().add_rlpx_sub_protocol(protocol);
75    }
76
77    /// Access to the state.
78    pub(crate) const fn state(&self) -> &NetworkState<N> {
79        &self.state
80    }
81
82    /// Mutable access to the state.
83    pub(crate) fn state_mut(&mut self) -> &mut NetworkState<N> {
84        &mut self.state
85    }
86
87    /// Access to the [`ConnectionListener`].
88    pub(crate) const fn listener(&self) -> &ConnectionListener {
89        &self.incoming
90    }
91
92    /// Access to the [`SessionManager`].
93    pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94        &self.sessions
95    }
96
97    /// Mutable access to the [`SessionManager`].
98    pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99        &mut self.sessions
100    }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104    /// Triggers a new outgoing connection to the given node
105    pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106        self.sessions.dial_outbound(remote_addr, remote_id)
107    }
108
109    /// Handles a polled [`SessionEvent`]
110    ///
111    /// This either updates the state or produces a new [`SwarmEvent`] that is bubbled up to the
112    /// manager.
113    fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114        match event {
115            SessionEvent::SessionEstablished {
116                peer_id,
117                remote_addr,
118                client_version,
119                capabilities,
120                version,
121                status,
122                messages,
123                direction,
124                timeout,
125            } => {
126                self.state.on_session_activated(
127                    peer_id,
128                    capabilities.clone(),
129                    status.clone(),
130                    messages.clone(),
131                    timeout,
132                );
133                Some(SwarmEvent::SessionEstablished {
134                    peer_id,
135                    remote_addr,
136                    client_version,
137                    capabilities,
138                    version,
139                    messages,
140                    status,
141                    direction,
142                })
143            }
144            SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
145                trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
146                self.state.peers_mut().on_already_connected(direction);
147                None
148            }
149            SessionEvent::ValidMessage { peer_id, message } => {
150                Some(SwarmEvent::ValidMessage { peer_id, message })
151            }
152            SessionEvent::InvalidMessage { peer_id, capabilities, message } => {
153                Some(SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message })
154            }
155            SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
156                Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
157            }
158            SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
159                Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
160            }
161            SessionEvent::Disconnected { peer_id, remote_addr } => {
162                self.state.on_session_closed(peer_id);
163                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
164            }
165            SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
166                self.state.on_session_closed(peer_id);
167                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
168            }
169            SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
170                Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
171            }
172            SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
173            SessionEvent::ProtocolBreach { peer_id } => {
174                Some(SwarmEvent::ProtocolBreach { peer_id })
175            }
176        }
177    }
178
179    /// Callback for events produced by [`ConnectionListener`].
180    ///
181    /// Depending on the event, this will produce a new [`SwarmEvent`].
182    fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
183        match event {
184            ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
185            ListenerEvent::ListenerClosed { local_address: address } => {
186                return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
187            }
188            ListenerEvent::Incoming { stream, remote_addr } => {
189                // Reject incoming connection if node is shutting down.
190                if self.is_shutting_down() {
191                    return None
192                }
193                // ensure we can handle an incoming connection from this address
194                if let Err(err) =
195                    self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
196                {
197                    match err {
198                        InboundConnectionError::IpBanned => {
199                            trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
200                        }
201                        InboundConnectionError::ExceedsCapacity => {
202                            trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
203                            self.sessions.try_disconnect_incoming_connection(
204                                stream,
205                                DisconnectReason::TooManyPeers,
206                            );
207                        }
208                    }
209                    return None
210                }
211
212                match self.sessions.on_incoming(stream, remote_addr) {
213                    Ok(session_id) => {
214                        trace!(target: "net", ?remote_addr, "Incoming connection");
215                        return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
216                    }
217                    Err(err) => {
218                        trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
219                        self.state_mut()
220                            .peers_mut()
221                            .on_incoming_pending_session_rejected_internally();
222                    }
223                }
224            }
225        }
226        None
227    }
228
229    /// Hook for actions pulled from the state
230    fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
231        match event {
232            StateAction::Connect { remote_addr, peer_id } => {
233                self.dial_outbound(remote_addr, peer_id);
234                return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
235            }
236            StateAction::Disconnect { peer_id, reason } => {
237                self.sessions.disconnect(peer_id, reason);
238            }
239            StateAction::NewBlock { peer_id, block: msg } => {
240                let msg = PeerMessage::NewBlock(msg);
241                self.sessions.send_message(&peer_id, msg);
242            }
243            StateAction::NewBlockHashes { peer_id, hashes } => {
244                let msg = PeerMessage::NewBlockHashes(hashes);
245                self.sessions.send_message(&peer_id, msg);
246            }
247            StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
248            StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
249            StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
250                // Don't try to connect to peer if node is shutting down
251                if self.is_shutting_down() {
252                    return None
253                }
254                // Insert peer only if no fork id or a valid fork id
255                if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
256                    self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
257                }
258            }
259            StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
260                if self.sessions.is_valid_fork_id(fork_id) {
261                    self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
262                } else {
263                    self.state_mut().peers_mut().remove_peer(peer_id);
264                }
265            }
266        }
267        None
268    }
269
270    /// Set network connection state to `ShuttingDown`
271    pub(crate) fn on_shutdown_requested(&mut self) {
272        self.state_mut().peers_mut().on_shutdown();
273    }
274
275    /// Checks if the node's network connection state is '`ShuttingDown`'
276    #[inline]
277    pub(crate) const fn is_shutting_down(&self) -> bool {
278        self.state().peers().connection_state().is_shutting_down()
279    }
280
281    /// Set network connection state to `Hibernate` or `Active`
282    pub(crate) fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
283        self.state_mut().peers_mut().on_network_state_change(network_state);
284    }
285}
286
287impl<N: NetworkPrimitives> Stream for Swarm<N> {
288    type Item = SwarmEvent<N>;
289
290    /// This advances all components.
291    ///
292    /// Processes, delegates (internal) commands received from the
293    /// [`NetworkManager`](crate::NetworkManager), then polls the [`SessionManager`] which
294    /// yields messages produced by individual peer sessions that are then handled. Least
295    /// priority are incoming connections that are handled and delegated to
296    /// the [`SessionManager`] to turn them into a session.
297    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298        let this = self.get_mut();
299
300        // This loop advances the network's state prioritizing local work [NetworkState] over work
301        // coming in from the network [SessionManager], [ConnectionListener]
302        // Existing connections are prioritized over new __incoming__ connections
303        loop {
304            while let Poll::Ready(action) = this.state.poll(cx) {
305                if let Some(event) = this.on_state_action(action) {
306                    return Poll::Ready(Some(event))
307                }
308            }
309
310            // poll all sessions
311            match this.sessions.poll(cx) {
312                Poll::Pending => {}
313                Poll::Ready(event) => {
314                    if let Some(event) = this.on_session_event(event) {
315                        return Poll::Ready(Some(event))
316                    }
317                    continue
318                }
319            }
320
321            // poll listener for incoming connections
322            match Pin::new(&mut this.incoming).poll(cx) {
323                Poll::Pending => {}
324                Poll::Ready(event) => {
325                    if let Some(event) = this.on_connection(event) {
326                        return Poll::Ready(Some(event))
327                    }
328                    continue
329                }
330            }
331
332            return Poll::Pending
333        }
334    }
335}
336
337/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
338/// network.
339pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
340    /// Events related to the actual network protocol.
341    ValidMessage {
342        /// The peer that sent the message
343        peer_id: PeerId,
344        /// Message received from the peer
345        message: PeerMessage<N>,
346    },
347    /// Received a message that does not match the announced capabilities of the peer.
348    InvalidCapabilityMessage {
349        peer_id: PeerId,
350        /// Announced capabilities of the remote peer.
351        capabilities: Arc<Capabilities>,
352        /// Message received from the peer.
353        message: CapabilityMessage<N>,
354    },
355    /// Received a bad message from the peer.
356    BadMessage {
357        /// Identifier of the remote peer.
358        peer_id: PeerId,
359    },
360    /// Remote peer is considered in protocol violation
361    ProtocolBreach {
362        /// Identifier of the remote peer.
363        peer_id: PeerId,
364    },
365    /// The underlying tcp listener closed.
366    TcpListenerClosed {
367        /// Address of the closed listener.
368        remote_addr: SocketAddr,
369    },
370    /// The underlying tcp listener encountered an error that we bubble up.
371    TcpListenerError(io::Error),
372    /// Received an incoming tcp connection.
373    ///
374    /// This represents the first step in the session authentication process. The swarm will
375    /// produce subsequent events once the stream has been authenticated, or was rejected.
376    IncomingTcpConnection {
377        /// The internal session identifier under which this connection is currently tracked.
378        session_id: SessionId,
379        /// Address of the remote peer.
380        remote_addr: SocketAddr,
381    },
382    /// An outbound connection is initiated.
383    OutgoingTcpConnection {
384        /// Address of the remote peer.
385        peer_id: PeerId,
386        remote_addr: SocketAddr,
387    },
388    SessionEstablished {
389        peer_id: PeerId,
390        remote_addr: SocketAddr,
391        client_version: Arc<str>,
392        capabilities: Arc<Capabilities>,
393        /// negotiated eth version
394        version: EthVersion,
395        messages: PeerRequestSender<PeerRequest<N>>,
396        status: Arc<Status>,
397        direction: Direction,
398    },
399    SessionClosed {
400        peer_id: PeerId,
401        remote_addr: SocketAddr,
402        /// Whether the session was closed due to an error
403        error: Option<EthStreamError>,
404    },
405    /// Admin rpc: new peer added
406    PeerAdded(PeerId),
407    /// Admin rpc: peer removed
408    PeerRemoved(PeerId),
409    /// Closed an incoming pending session during authentication.
410    IncomingPendingSessionClosed {
411        remote_addr: SocketAddr,
412        error: Option<PendingSessionHandshakeError>,
413    },
414    /// Closed an outgoing pending session during authentication.
415    OutgoingPendingSessionClosed {
416        remote_addr: SocketAddr,
417        peer_id: PeerId,
418        error: Option<PendingSessionHandshakeError>,
419    },
420    /// Failed to establish a tcp stream to the given address/node
421    OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
422}
423
424/// Represents the state of the connection of the node. If shutting down,
425/// new connections won't be established.
426/// When in hibernation mode, the node will not initiate new outbound connections. This is
427/// beneficial for sync stages that do not require a network connection.
428#[derive(Debug, Default)]
429pub enum NetworkConnectionState {
430    /// Node is active, new outbound connections will be established.
431    #[default]
432    Active,
433    /// Node is shutting down, no new outbound connections will be established.
434    ShuttingDown,
435    /// Hibernate Network connection, no new outbound connections will be established.
436    Hibernate,
437}
438
439impl NetworkConnectionState {
440    /// Returns true if the node is active.
441    pub(crate) const fn is_active(&self) -> bool {
442        matches!(self, Self::Active)
443    }
444
445    /// Returns true if the node is shutting down.
446    pub(crate) const fn is_shutting_down(&self) -> bool {
447        matches!(self, Self::ShuttingDown)
448    }
449}