1use crate::{
2 listener::{ConnectionListener, ListenerEvent},
3 message::PeerMessage,
4 peers::InboundConnectionError,
5 protocol::IntoRlpxSubProtocol,
6 session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7 state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11 capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
12 EthNetworkPrimitives, EthVersion, NetworkPrimitives, Status,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17 io,
18 net::SocketAddr,
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52 incoming: ConnectionListener,
54 sessions: SessionManager<N>,
56 state: NetworkState<N>,
58}
59
60impl<N: NetworkPrimitives> Swarm<N> {
63 pub(crate) const fn new(
65 incoming: ConnectionListener,
66 sessions: SessionManager<N>,
67 state: NetworkState<N>,
68 ) -> Self {
69 Self { incoming, sessions, state }
70 }
71
72 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74 self.sessions_mut().add_rlpx_sub_protocol(protocol);
75 }
76
77 pub(crate) const fn state(&self) -> &NetworkState<N> {
79 &self.state
80 }
81
82 pub(crate) fn state_mut(&mut self) -> &mut NetworkState<N> {
84 &mut self.state
85 }
86
87 pub(crate) const fn listener(&self) -> &ConnectionListener {
89 &self.incoming
90 }
91
92 pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94 &self.sessions
95 }
96
97 pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99 &mut self.sessions
100 }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104 pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106 self.sessions.dial_outbound(remote_addr, remote_id)
107 }
108
109 fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114 match event {
115 SessionEvent::SessionEstablished {
116 peer_id,
117 remote_addr,
118 client_version,
119 capabilities,
120 version,
121 status,
122 messages,
123 direction,
124 timeout,
125 } => {
126 self.state.on_session_activated(
127 peer_id,
128 capabilities.clone(),
129 status.clone(),
130 messages.clone(),
131 timeout,
132 );
133 Some(SwarmEvent::SessionEstablished {
134 peer_id,
135 remote_addr,
136 client_version,
137 capabilities,
138 version,
139 messages,
140 status,
141 direction,
142 })
143 }
144 SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
145 trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
146 self.state.peers_mut().on_already_connected(direction);
147 None
148 }
149 SessionEvent::ValidMessage { peer_id, message } => {
150 Some(SwarmEvent::ValidMessage { peer_id, message })
151 }
152 SessionEvent::InvalidMessage { peer_id, capabilities, message } => {
153 Some(SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message })
154 }
155 SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
156 Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
157 }
158 SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
159 Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
160 }
161 SessionEvent::Disconnected { peer_id, remote_addr } => {
162 self.state.on_session_closed(peer_id);
163 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
164 }
165 SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
166 self.state.on_session_closed(peer_id);
167 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
168 }
169 SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
170 Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
171 }
172 SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
173 SessionEvent::ProtocolBreach { peer_id } => {
174 Some(SwarmEvent::ProtocolBreach { peer_id })
175 }
176 }
177 }
178
179 fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
183 match event {
184 ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
185 ListenerEvent::ListenerClosed { local_address: address } => {
186 return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
187 }
188 ListenerEvent::Incoming { stream, remote_addr } => {
189 if self.is_shutting_down() {
191 return None
192 }
193 if let Err(err) =
195 self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
196 {
197 match err {
198 InboundConnectionError::IpBanned => {
199 trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
200 }
201 InboundConnectionError::ExceedsCapacity => {
202 trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
203 self.sessions.try_disconnect_incoming_connection(
204 stream,
205 DisconnectReason::TooManyPeers,
206 );
207 }
208 }
209 return None
210 }
211
212 match self.sessions.on_incoming(stream, remote_addr) {
213 Ok(session_id) => {
214 trace!(target: "net", ?remote_addr, "Incoming connection");
215 return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
216 }
217 Err(err) => {
218 trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
219 self.state_mut()
220 .peers_mut()
221 .on_incoming_pending_session_rejected_internally();
222 }
223 }
224 }
225 }
226 None
227 }
228
229 fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
231 match event {
232 StateAction::Connect { remote_addr, peer_id } => {
233 self.dial_outbound(remote_addr, peer_id);
234 return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
235 }
236 StateAction::Disconnect { peer_id, reason } => {
237 self.sessions.disconnect(peer_id, reason);
238 }
239 StateAction::NewBlock { peer_id, block: msg } => {
240 let msg = PeerMessage::NewBlock(msg);
241 self.sessions.send_message(&peer_id, msg);
242 }
243 StateAction::NewBlockHashes { peer_id, hashes } => {
244 let msg = PeerMessage::NewBlockHashes(hashes);
245 self.sessions.send_message(&peer_id, msg);
246 }
247 StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
248 StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
249 StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
250 if self.is_shutting_down() {
252 return None
253 }
254 if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
256 self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
257 }
258 }
259 StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
260 if self.sessions.is_valid_fork_id(fork_id) {
261 self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
262 } else {
263 self.state_mut().peers_mut().remove_peer(peer_id);
264 }
265 }
266 }
267 None
268 }
269
270 pub(crate) fn on_shutdown_requested(&mut self) {
272 self.state_mut().peers_mut().on_shutdown();
273 }
274
275 #[inline]
277 pub(crate) const fn is_shutting_down(&self) -> bool {
278 self.state().peers().connection_state().is_shutting_down()
279 }
280
281 pub(crate) fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
283 self.state_mut().peers_mut().on_network_state_change(network_state);
284 }
285}
286
287impl<N: NetworkPrimitives> Stream for Swarm<N> {
288 type Item = SwarmEvent<N>;
289
290 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298 let this = self.get_mut();
299
300 loop {
304 while let Poll::Ready(action) = this.state.poll(cx) {
305 if let Some(event) = this.on_state_action(action) {
306 return Poll::Ready(Some(event))
307 }
308 }
309
310 match this.sessions.poll(cx) {
312 Poll::Pending => {}
313 Poll::Ready(event) => {
314 if let Some(event) = this.on_session_event(event) {
315 return Poll::Ready(Some(event))
316 }
317 continue
318 }
319 }
320
321 match Pin::new(&mut this.incoming).poll(cx) {
323 Poll::Pending => {}
324 Poll::Ready(event) => {
325 if let Some(event) = this.on_connection(event) {
326 return Poll::Ready(Some(event))
327 }
328 continue
329 }
330 }
331
332 return Poll::Pending
333 }
334 }
335}
336
337pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
340 ValidMessage {
342 peer_id: PeerId,
344 message: PeerMessage<N>,
346 },
347 InvalidCapabilityMessage {
349 peer_id: PeerId,
350 capabilities: Arc<Capabilities>,
352 message: CapabilityMessage<N>,
354 },
355 BadMessage {
357 peer_id: PeerId,
359 },
360 ProtocolBreach {
362 peer_id: PeerId,
364 },
365 TcpListenerClosed {
367 remote_addr: SocketAddr,
369 },
370 TcpListenerError(io::Error),
372 IncomingTcpConnection {
377 session_id: SessionId,
379 remote_addr: SocketAddr,
381 },
382 OutgoingTcpConnection {
384 peer_id: PeerId,
386 remote_addr: SocketAddr,
387 },
388 SessionEstablished {
389 peer_id: PeerId,
390 remote_addr: SocketAddr,
391 client_version: Arc<str>,
392 capabilities: Arc<Capabilities>,
393 version: EthVersion,
395 messages: PeerRequestSender<PeerRequest<N>>,
396 status: Arc<Status>,
397 direction: Direction,
398 },
399 SessionClosed {
400 peer_id: PeerId,
401 remote_addr: SocketAddr,
402 error: Option<EthStreamError>,
404 },
405 PeerAdded(PeerId),
407 PeerRemoved(PeerId),
409 IncomingPendingSessionClosed {
411 remote_addr: SocketAddr,
412 error: Option<PendingSessionHandshakeError>,
413 },
414 OutgoingPendingSessionClosed {
416 remote_addr: SocketAddr,
417 peer_id: PeerId,
418 error: Option<PendingSessionHandshakeError>,
419 },
420 OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
422}
423
424#[derive(Debug, Default)]
429pub enum NetworkConnectionState {
430 #[default]
432 Active,
433 ShuttingDown,
435 Hibernate,
437}
438
439impl NetworkConnectionState {
440 pub(crate) const fn is_active(&self) -> bool {
442 matches!(self, Self::Active)
443 }
444
445 pub(crate) const fn is_shutting_down(&self) -> bool {
447 matches!(self, Self::ShuttingDown)
448 }
449}