1mod active;
4mod conn;
5mod counter;
6mod handle;
7
8use active::QueuedOutgoingMessages;
9pub use conn::EthRlpxConnection;
10pub use handle::{
11 ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
12 SessionCommand,
13};
14
15pub use reth_network_api::{Direction, PeerInfo};
16
17use std::{
18 collections::HashMap,
19 future::Future,
20 net::SocketAddr,
21 sync::{atomic::AtomicU64, Arc},
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use crate::{
27 message::PeerMessage,
28 metrics::SessionManagerMetrics,
29 protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols},
30 session::active::ActiveSession,
31};
32use counter::SessionCounter;
33use futures::{future::Either, io, FutureExt, StreamExt};
34use reth_ecies::{stream::ECIESStream, ECIESError};
35use reth_eth_wire::{
36 capability::CapabilityMessage, errors::EthStreamError, multiplex::RlpxProtocolMultiplexer,
37 Capabilities, DisconnectReason, EthVersion, HelloMessageWithProtocols, NetworkPrimitives,
38 Status, UnauthedEthStream, UnauthedP2PStream,
39};
40use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
41use reth_metrics::common::mpsc::MeteredPollSender;
42use reth_network_api::{PeerRequest, PeerRequestSender};
43use reth_network_peers::PeerId;
44use reth_network_types::SessionsConfig;
45use reth_tasks::TaskSpawner;
46use rustc_hash::FxHashMap;
47use secp256k1::SecretKey;
48use tokio::{
49 io::{AsyncRead, AsyncWrite},
50 net::TcpStream,
51 sync::{mpsc, mpsc::error::TrySendError, oneshot},
52};
53use tokio_stream::wrappers::ReceiverStream;
54use tokio_util::sync::PollSender;
55use tracing::{debug, instrument, trace};
56
57#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
59pub struct SessionId(usize);
60
61#[must_use = "Session Manager must be polled to process session events."]
63#[derive(Debug)]
64pub struct SessionManager<N: NetworkPrimitives> {
65 next_id: usize,
67 counter: SessionCounter,
69 initial_internal_request_timeout: Duration,
72 protocol_breach_request_timeout: Duration,
75 pending_session_timeout: Duration,
77 secret_key: SecretKey,
79 status: Status,
81 hello_message: HelloMessageWithProtocols,
83 fork_filter: ForkFilter,
85 session_command_buffer: usize,
87 executor: Box<dyn TaskSpawner>,
89 pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
94 active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
96 pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
101 pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
103 active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
108 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
110 extra_protocols: RlpxSubProtocols,
112 disconnections_counter: DisconnectionsCounter,
114 metrics: SessionManagerMetrics,
116}
117
118impl<N: NetworkPrimitives> SessionManager<N> {
121 #[allow(clippy::too_many_arguments)]
123 pub fn new(
124 secret_key: SecretKey,
125 config: SessionsConfig,
126 executor: Box<dyn TaskSpawner>,
127 status: Status,
128 hello_message: HelloMessageWithProtocols,
129 fork_filter: ForkFilter,
130 extra_protocols: RlpxSubProtocols,
131 ) -> Self {
132 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
133 let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
134 let active_session_tx = PollSender::new(active_session_tx);
135
136 Self {
137 next_id: 0,
138 counter: SessionCounter::new(config.limits),
139 initial_internal_request_timeout: config.initial_internal_request_timeout,
140 protocol_breach_request_timeout: config.protocol_breach_request_timeout,
141 pending_session_timeout: config.pending_session_timeout,
142 secret_key,
143 status,
144 hello_message,
145 fork_filter,
146 session_command_buffer: config.session_command_buffer,
147 executor,
148 pending_sessions: Default::default(),
149 active_sessions: Default::default(),
150 pending_sessions_tx,
151 pending_session_rx: ReceiverStream::new(pending_sessions_rx),
152 active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
153 active_session_rx: ReceiverStream::new(active_session_rx),
154 extra_protocols,
155 disconnections_counter: Default::default(),
156 metrics: Default::default(),
157 }
158 }
159
160 pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
163 self.fork_filter.validate(fork_id).is_ok()
164 }
165
166 fn next_id(&mut self) -> SessionId {
168 let id = self.next_id;
169 self.next_id += 1;
170 SessionId(id)
171 }
172
173 pub const fn status(&self) -> Status {
175 self.status
176 }
177
178 pub const fn secret_key(&self) -> SecretKey {
180 self.secret_key
181 }
182
183 pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
185 &self.active_sessions
186 }
187
188 pub fn hello_message(&self) -> HelloMessageWithProtocols {
190 self.hello_message.clone()
191 }
192
193 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
195 self.extra_protocols.push(protocol)
196 }
197
198 #[inline]
200 pub(crate) fn num_pending_connections(&self) -> usize {
201 self.pending_sessions.len()
202 }
203
204 fn spawn<F>(&self, f: F)
207 where
208 F: Future<Output = ()> + Send + 'static,
209 {
210 self.executor.spawn(f.boxed());
211 }
212
213 pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
218 self.status.blockhash = head.hash;
219 self.status.total_difficulty = head.total_difficulty;
220 let transition = self.fork_filter.set_head(head);
221 self.status.forkid = self.fork_filter.current();
222 transition
223 }
224
225 pub(crate) fn on_incoming(
230 &mut self,
231 stream: TcpStream,
232 remote_addr: SocketAddr,
233 ) -> Result<SessionId, ExceedsSessionLimit> {
234 self.counter.ensure_pending_inbound()?;
235
236 let session_id = self.next_id();
237
238 trace!(
239 target: "net::session",
240 ?remote_addr,
241 ?session_id,
242 "new pending incoming session"
243 );
244
245 let (disconnect_tx, disconnect_rx) = oneshot::channel();
246 let pending_events = self.pending_sessions_tx.clone();
247 let secret_key = self.secret_key;
248 let hello_message = self.hello_message.clone();
249 let status = self.status;
250 let fork_filter = self.fork_filter.clone();
251 let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
252 self.spawn(pending_session_with_timeout(
253 self.pending_session_timeout,
254 session_id,
255 remote_addr,
256 Direction::Incoming,
257 pending_events.clone(),
258 start_pending_incoming_session(
259 disconnect_rx,
260 session_id,
261 stream,
262 pending_events,
263 remote_addr,
264 secret_key,
265 hello_message,
266 status,
267 fork_filter,
268 extra_handlers,
269 ),
270 ));
271
272 let handle = PendingSessionHandle {
273 disconnect_tx: Some(disconnect_tx),
274 direction: Direction::Incoming,
275 };
276 self.pending_sessions.insert(session_id, handle);
277 self.counter.inc_pending_inbound();
278 Ok(session_id)
279 }
280
281 pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
283 if self.counter.ensure_pending_outbound().is_ok() {
285 let session_id = self.next_id();
286 let (disconnect_tx, disconnect_rx) = oneshot::channel();
287 let pending_events = self.pending_sessions_tx.clone();
288 let secret_key = self.secret_key;
289 let hello_message = self.hello_message.clone();
290 let fork_filter = self.fork_filter.clone();
291 let status = self.status;
292 let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
293 self.spawn(pending_session_with_timeout(
294 self.pending_session_timeout,
295 session_id,
296 remote_addr,
297 Direction::Outgoing(remote_peer_id),
298 pending_events.clone(),
299 start_pending_outbound_session(
300 disconnect_rx,
301 pending_events,
302 session_id,
303 remote_addr,
304 remote_peer_id,
305 secret_key,
306 hello_message,
307 status,
308 fork_filter,
309 extra_handlers,
310 ),
311 ));
312
313 let handle = PendingSessionHandle {
314 disconnect_tx: Some(disconnect_tx),
315 direction: Direction::Outgoing(remote_peer_id),
316 };
317 self.pending_sessions.insert(session_id, handle);
318 self.counter.inc_pending_outbound();
319 }
320 }
321
322 pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
327 if let Some(session) = self.active_sessions.get(&node) {
328 session.disconnect(reason);
329 }
330 }
331
332 pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
337 for session in self.active_sessions.values() {
338 session.disconnect(reason);
339 }
340 }
341
342 pub fn disconnect_all_pending(&mut self) {
344 for session in self.pending_sessions.values_mut() {
345 session.disconnect();
346 }
347 }
348
349 pub fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage<N>) {
351 if let Some(session) = self.active_sessions.get_mut(peer_id) {
352 let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
353 |e| {
354 if let TrySendError::Full(_) = e {
355 debug!(
356 target: "net::session",
357 ?peer_id,
358 "session command buffer full, dropping message"
359 );
360 self.metrics.total_outgoing_peer_messages_dropped.increment(1);
361 }
362 },
363 );
364 }
365 }
366
367 fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
369 let session = self.pending_sessions.remove(id)?;
370 self.counter.dec_pending(&session.direction);
371 Some(session)
372 }
373
374 fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
376 let session = self.active_sessions.remove(id)?;
377 self.counter.dec_active(&session.direction);
378 Some(session)
379 }
380
381 pub(crate) fn try_disconnect_incoming_connection(
385 &self,
386 stream: TcpStream,
387 reason: DisconnectReason,
388 ) {
389 if !self.disconnections_counter.has_capacity() {
390 return
392 }
393
394 let guard = self.disconnections_counter.clone();
395 let secret_key = self.secret_key;
396
397 self.spawn(async move {
398 trace!(
399 target: "net::session",
400 "gracefully disconnecting incoming connection"
401 );
402 if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
403 let mut unauth = UnauthedP2PStream::new(stream);
404 let _ = unauth.send_disconnect(reason).await;
405 drop(guard);
406 }
407 });
408 }
409
410 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
414 match self.active_session_rx.poll_next_unpin(cx) {
416 Poll::Pending => {}
417 Poll::Ready(None) => {
418 unreachable!("Manager holds both channel halves.")
419 }
420 Poll::Ready(Some(event)) => {
421 return match event {
422 ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
423 trace!(
424 target: "net::session",
425 ?peer_id,
426 "gracefully disconnected active session."
427 );
428 self.remove_active_session(&peer_id);
429 Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
430 }
431 ActiveSessionMessage::ClosedOnConnectionError {
432 peer_id,
433 remote_addr,
434 error,
435 } => {
436 trace!(target: "net::session", ?peer_id, %error,"closed session.");
437 self.remove_active_session(&peer_id);
438 Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
439 remote_addr,
440 peer_id,
441 error,
442 })
443 }
444 ActiveSessionMessage::ValidMessage { peer_id, message } => {
445 Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
446 }
447 ActiveSessionMessage::InvalidMessage { peer_id, capabilities, message } => {
448 Poll::Ready(SessionEvent::InvalidMessage { peer_id, message, capabilities })
449 }
450 ActiveSessionMessage::BadMessage { peer_id } => {
451 Poll::Ready(SessionEvent::BadMessage { peer_id })
452 }
453 ActiveSessionMessage::ProtocolBreach { peer_id } => {
454 Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
455 }
456 }
457 }
458 }
459
460 let event = match self.pending_session_rx.poll_next_unpin(cx) {
462 Poll::Pending => return Poll::Pending,
463 Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
464 Poll::Ready(Some(event)) => event,
465 };
466 match event {
467 PendingSessionEvent::Established {
468 session_id,
469 remote_addr,
470 local_addr,
471 peer_id,
472 capabilities,
473 conn,
474 status,
475 direction,
476 client_id,
477 } => {
478 self.remove_pending_session(&session_id);
480
481 if self.active_sessions.contains_key(&peer_id) {
483 trace!(
484 target: "net::session",
485 ?session_id,
486 ?remote_addr,
487 ?peer_id,
488 ?direction,
489 "already connected"
490 );
491
492 self.spawn(async move {
493 let _ =
495 conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
496 });
497
498 return Poll::Ready(SessionEvent::AlreadyConnected {
499 peer_id,
500 remote_addr,
501 direction,
502 })
503 }
504
505 let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
506
507 let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
508
509 let messages = PeerRequestSender::new(peer_id, to_session_tx);
510
511 let timeout = Arc::new(AtomicU64::new(
512 self.initial_internal_request_timeout.as_millis() as u64,
513 ));
514
515 let version = conn.version();
517
518 let session = ActiveSession {
519 next_id: 0,
520 remote_peer_id: peer_id,
521 remote_addr,
522 remote_capabilities: Arc::clone(&capabilities),
523 session_id,
524 commands_rx: ReceiverStream::new(commands_rx),
525 to_session_manager: self.active_session_tx.clone(),
526 pending_message_to_session: None,
527 internal_request_tx: ReceiverStream::new(messages_rx).fuse(),
528 inflight_requests: Default::default(),
529 conn,
530 queued_outgoing: QueuedOutgoingMessages::new(
531 self.metrics.queued_outgoing_messages.clone(),
532 ),
533 received_requests_from_remote: Default::default(),
534 internal_request_timeout_interval: tokio::time::interval(
535 self.initial_internal_request_timeout,
536 ),
537 internal_request_timeout: Arc::clone(&timeout),
538 protocol_breach_request_timeout: self.protocol_breach_request_timeout,
539 terminate_message: None,
540 };
541
542 self.spawn(session);
543
544 let client_version = client_id.into();
545 let handle = ActiveSessionHandle {
546 status: status.clone(),
547 direction,
548 session_id,
549 remote_id: peer_id,
550 version,
551 established: Instant::now(),
552 capabilities: Arc::clone(&capabilities),
553 commands_to_session,
554 client_version: Arc::clone(&client_version),
555 remote_addr,
556 local_addr,
557 };
558
559 self.active_sessions.insert(peer_id, handle);
560 self.counter.inc_active(&direction);
561
562 if direction.is_outgoing() {
563 self.metrics.total_dial_successes.increment(1);
564 }
565
566 Poll::Ready(SessionEvent::SessionEstablished {
567 peer_id,
568 remote_addr,
569 client_version,
570 version,
571 capabilities,
572 status,
573 messages,
574 direction,
575 timeout,
576 })
577 }
578 PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
579 trace!(
580 target: "net::session",
581 ?session_id,
582 ?remote_addr,
583 ?error,
584 "disconnected pending session"
585 );
586 self.remove_pending_session(&session_id);
587 match direction {
588 Direction::Incoming => {
589 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
590 remote_addr,
591 error,
592 })
593 }
594 Direction::Outgoing(peer_id) => {
595 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
596 remote_addr,
597 peer_id,
598 error,
599 })
600 }
601 }
602 }
603 PendingSessionEvent::OutgoingConnectionError {
604 remote_addr,
605 session_id,
606 peer_id,
607 error,
608 } => {
609 trace!(
610 target: "net::session",
611 %error,
612 ?session_id,
613 ?remote_addr,
614 ?peer_id,
615 "connection refused"
616 );
617 self.remove_pending_session(&session_id);
618 Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
619 }
620 PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
621 trace!(
622 target: "net::session",
623 %error,
624 ?session_id,
625 ?remote_addr,
626 "ecies auth failed"
627 );
628 self.remove_pending_session(&session_id);
629 match direction {
630 Direction::Incoming => {
631 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
632 remote_addr,
633 error: Some(PendingSessionHandshakeError::Ecies(error)),
634 })
635 }
636 Direction::Outgoing(peer_id) => {
637 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
638 remote_addr,
639 peer_id,
640 error: Some(PendingSessionHandshakeError::Ecies(error)),
641 })
642 }
643 }
644 }
645 }
646 }
647}
648
649#[derive(Default, Debug, Clone)]
651struct DisconnectionsCounter(Arc<()>);
652
653impl DisconnectionsCounter {
654 const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
655
656 fn has_capacity(&self) -> bool {
659 Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
660 }
661}
662
663#[derive(Debug)]
665pub enum SessionEvent<N: NetworkPrimitives> {
666 SessionEstablished {
670 peer_id: PeerId,
672 remote_addr: SocketAddr,
674 client_version: Arc<str>,
676 capabilities: Arc<Capabilities>,
678 version: EthVersion,
680 status: Arc<Status>,
682 messages: PeerRequestSender<PeerRequest<N>>,
684 direction: Direction,
686 timeout: Arc<AtomicU64>,
689 },
690 AlreadyConnected {
692 peer_id: PeerId,
694 remote_addr: SocketAddr,
696 direction: Direction,
698 },
699 ValidMessage {
701 peer_id: PeerId,
703 message: PeerMessage<N>,
705 },
706 InvalidMessage {
708 peer_id: PeerId,
710 capabilities: Arc<Capabilities>,
712 message: CapabilityMessage<N>,
714 },
715 BadMessage {
717 peer_id: PeerId,
719 },
720 ProtocolBreach {
722 peer_id: PeerId,
724 },
725 IncomingPendingSessionClosed {
727 remote_addr: SocketAddr,
729 error: Option<PendingSessionHandshakeError>,
731 },
732 OutgoingPendingSessionClosed {
734 remote_addr: SocketAddr,
736 peer_id: PeerId,
738 error: Option<PendingSessionHandshakeError>,
740 },
741 OutgoingConnectionError {
743 remote_addr: SocketAddr,
745 peer_id: PeerId,
747 error: io::Error,
749 },
750 SessionClosedOnConnectionError {
752 peer_id: PeerId,
754 remote_addr: SocketAddr,
756 error: EthStreamError,
758 },
759 Disconnected {
761 peer_id: PeerId,
763 remote_addr: SocketAddr,
765 },
766}
767
768#[derive(Debug, thiserror::Error)]
770pub enum PendingSessionHandshakeError {
771 #[error(transparent)]
773 Eth(EthStreamError),
774 #[error(transparent)]
776 Ecies(ECIESError),
777 #[error("authentication timed out")]
779 Timeout,
780}
781
782impl PendingSessionHandshakeError {
783 pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
785 match self {
786 Self::Eth(eth_err) => eth_err.as_disconnected(),
787 _ => None,
788 }
789 }
790}
791
792#[derive(Debug, Clone, thiserror::Error)]
795#[error("session limit reached {0}")]
796pub struct ExceedsSessionLimit(pub(crate) u32);
797
798pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
800 timeout: Duration,
801 session_id: SessionId,
802 remote_addr: SocketAddr,
803 direction: Direction,
804 events: mpsc::Sender<PendingSessionEvent<N>>,
805 f: F,
806) where
807 F: Future<Output = ()>,
808{
809 if tokio::time::timeout(timeout, f).await.is_err() {
810 trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
811 let event = PendingSessionEvent::Disconnected {
812 remote_addr,
813 session_id,
814 direction,
815 error: Some(PendingSessionHandshakeError::Timeout),
816 };
817 let _ = events.send(event).await;
818 }
819}
820
821#[allow(clippy::too_many_arguments)]
825pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
826 disconnect_rx: oneshot::Receiver<()>,
827 session_id: SessionId,
828 stream: TcpStream,
829 events: mpsc::Sender<PendingSessionEvent<N>>,
830 remote_addr: SocketAddr,
831 secret_key: SecretKey,
832 hello: HelloMessageWithProtocols,
833 status: Status,
834 fork_filter: ForkFilter,
835 extra_handlers: RlpxSubProtocolHandlers,
836) {
837 authenticate(
838 disconnect_rx,
839 events,
840 stream,
841 session_id,
842 remote_addr,
843 secret_key,
844 Direction::Incoming,
845 hello,
846 status,
847 fork_filter,
848 extra_handlers,
849 )
850 .await
851}
852
853#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
855#[allow(clippy::too_many_arguments)]
856async fn start_pending_outbound_session<N: NetworkPrimitives>(
857 disconnect_rx: oneshot::Receiver<()>,
858 events: mpsc::Sender<PendingSessionEvent<N>>,
859 session_id: SessionId,
860 remote_addr: SocketAddr,
861 remote_peer_id: PeerId,
862 secret_key: SecretKey,
863 hello: HelloMessageWithProtocols,
864 status: Status,
865 fork_filter: ForkFilter,
866 extra_handlers: RlpxSubProtocolHandlers,
867) {
868 let stream = match TcpStream::connect(remote_addr).await {
869 Ok(stream) => {
870 if let Err(err) = stream.set_nodelay(true) {
871 tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
872 }
873 stream
874 }
875 Err(error) => {
876 let _ = events
877 .send(PendingSessionEvent::OutgoingConnectionError {
878 remote_addr,
879 session_id,
880 peer_id: remote_peer_id,
881 error,
882 })
883 .await;
884 return
885 }
886 };
887 authenticate(
888 disconnect_rx,
889 events,
890 stream,
891 session_id,
892 remote_addr,
893 secret_key,
894 Direction::Outgoing(remote_peer_id),
895 hello,
896 status,
897 fork_filter,
898 extra_handlers,
899 )
900 .await
901}
902
903#[allow(clippy::too_many_arguments)]
905async fn authenticate<N: NetworkPrimitives>(
906 disconnect_rx: oneshot::Receiver<()>,
907 events: mpsc::Sender<PendingSessionEvent<N>>,
908 stream: TcpStream,
909 session_id: SessionId,
910 remote_addr: SocketAddr,
911 secret_key: SecretKey,
912 direction: Direction,
913 hello: HelloMessageWithProtocols,
914 status: Status,
915 fork_filter: ForkFilter,
916 extra_handlers: RlpxSubProtocolHandlers,
917) {
918 let local_addr = stream.local_addr().ok();
919 let stream = match get_ecies_stream(stream, secret_key, direction).await {
920 Ok(stream) => stream,
921 Err(error) => {
922 let _ = events
923 .send(PendingSessionEvent::EciesAuthError {
924 remote_addr,
925 session_id,
926 error,
927 direction,
928 })
929 .await;
930 return
931 }
932 };
933
934 let unauthed = UnauthedP2PStream::new(stream);
935
936 let auth = authenticate_stream(
937 unauthed,
938 session_id,
939 remote_addr,
940 local_addr,
941 direction,
942 hello,
943 status,
944 fork_filter,
945 extra_handlers,
946 )
947 .boxed();
948
949 match futures::future::select(disconnect_rx, auth).await {
950 Either::Left((_, _)) => {
951 let _ = events
952 .send(PendingSessionEvent::Disconnected {
953 remote_addr,
954 session_id,
955 direction,
956 error: None,
957 })
958 .await;
959 }
960 Either::Right((res, _)) => {
961 let _ = events.send(res).await;
962 }
963 }
964}
965
966async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
969 stream: Io,
970 secret_key: SecretKey,
971 direction: Direction,
972) -> Result<ECIESStream<Io>, ECIESError> {
973 match direction {
974 Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
975 Direction::Outgoing(remote_peer_id) => {
976 ECIESStream::connect(stream, secret_key, remote_peer_id).await
977 }
978 }
979}
980
981#[allow(clippy::too_many_arguments)]
988async fn authenticate_stream<N: NetworkPrimitives>(
989 stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
990 session_id: SessionId,
991 remote_addr: SocketAddr,
992 local_addr: Option<SocketAddr>,
993 direction: Direction,
994 mut hello: HelloMessageWithProtocols,
995 mut status: Status,
996 fork_filter: ForkFilter,
997 mut extra_handlers: RlpxSubProtocolHandlers,
998) -> PendingSessionEvent<N> {
999 extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1001
1002 let (p2p_stream, their_hello) = match stream.handshake(hello).await {
1004 Ok(stream_res) => stream_res,
1005 Err(err) => {
1006 return PendingSessionEvent::Disconnected {
1007 remote_addr,
1008 session_id,
1009 direction,
1010 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1011 }
1012 }
1013 };
1014
1015 let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1017 Ok(version) => version,
1018 Err(err) => {
1019 return PendingSessionEvent::Disconnected {
1020 remote_addr,
1021 session_id,
1022 direction,
1023 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1024 }
1025 }
1026 };
1027
1028 let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1029 status.set_eth_version(eth_version);
1033 let eth_unauthed = UnauthedEthStream::new(p2p_stream);
1034 let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
1035 Ok(stream_res) => stream_res,
1036 Err(err) => {
1037 return PendingSessionEvent::Disconnected {
1038 remote_addr,
1039 session_id,
1040 direction,
1041 error: Some(PendingSessionHandshakeError::Eth(err)),
1042 }
1043 }
1044 };
1045 (eth_stream.into(), their_status)
1046 } else {
1047 let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1049
1050 for handler in extra_handlers.into_iter() {
1052 let cap = handler.protocol().cap;
1053 let remote_peer_id = their_hello.id;
1054 multiplex_stream
1055 .install_protocol(&cap, move |conn| {
1056 handler.into_connection(direction, remote_peer_id, conn)
1057 })
1058 .ok();
1059 }
1060
1061 let (multiplex_stream, their_status) =
1062 match multiplex_stream.into_eth_satellite_stream(status, fork_filter).await {
1063 Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1064 Err(err) => {
1065 return PendingSessionEvent::Disconnected {
1066 remote_addr,
1067 session_id,
1068 direction,
1069 error: Some(PendingSessionHandshakeError::Eth(err)),
1070 }
1071 }
1072 };
1073
1074 (multiplex_stream.into(), their_status)
1075 };
1076
1077 PendingSessionEvent::Established {
1078 session_id,
1079 remote_addr,
1080 local_addr,
1081 peer_id: their_hello.id,
1082 capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1083 status: Arc::new(their_status),
1084 conn,
1085 direction,
1086 client_id: their_hello.client_version,
1087 }
1088}