1use reth_eth_wire_types::{
4 message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
5 EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
6 GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts,
7 Status,
8};
9use reth_ethereum_forks::ForkId;
10use reth_network_p2p::error::{RequestError, RequestResult};
11use reth_network_peers::PeerId;
12use reth_network_types::PeerAddr;
13use reth_tokio_util::EventStream;
14use std::{
15 fmt,
16 net::SocketAddr,
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20};
21use tokio::sync::{mpsc, oneshot};
22use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
23
24pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);
26
27impl fmt::Debug for PeerEventStream {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("PeerEventStream").finish_non_exhaustive()
30 }
31}
32
33impl PeerEventStream {
34 pub fn new<S, T>(stream: S) -> Self
37 where
38 S: Stream<Item = T> + Send + Sync + 'static,
39 T: Into<PeerEvent> + 'static,
40 {
41 let mapped_stream = stream.map(Into::into);
42 Self(Box::pin(mapped_stream))
43 }
44}
45
46impl Stream for PeerEventStream {
47 type Item = PeerEvent;
48
49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 self.0.as_mut().poll_next(cx)
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct SessionInfo {
57 pub peer_id: PeerId,
59 pub remote_addr: SocketAddr,
61 pub client_version: Arc<str>,
63 pub capabilities: Arc<Capabilities>,
65 pub status: Arc<Status>,
67 pub version: EthVersion,
69}
70
71#[derive(Debug, Clone)]
77pub enum PeerEvent {
78 SessionClosed {
80 peer_id: PeerId,
82 reason: Option<DisconnectReason>,
84 },
85 SessionEstablished(SessionInfo),
87 PeerAdded(PeerId),
89 PeerRemoved(PeerId),
91}
92
93#[derive(Debug)]
95pub enum NetworkEvent<R = PeerRequest> {
96 Peer(PeerEvent),
98 ActivePeerSession {
100 info: SessionInfo,
102 messages: PeerRequestSender<R>,
104 },
105}
106
107impl<R> Clone for NetworkEvent<R> {
108 fn clone(&self) -> Self {
109 match self {
110 Self::Peer(event) => Self::Peer(event.clone()),
111 Self::ActivePeerSession { info, messages } => {
112 Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
113 }
114 }
115 }
116}
117
118impl<R> From<NetworkEvent<R>> for PeerEvent {
119 fn from(event: NetworkEvent<R>) -> Self {
120 match event {
121 NetworkEvent::Peer(peer_event) => peer_event,
122 NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
123 }
124 }
125}
126
127#[auto_impl::auto_impl(&, Arc)]
129pub trait NetworkPeersEvents: Send + Sync {
130 fn peer_events(&self) -> PeerEventStream;
132}
133
134#[auto_impl::auto_impl(&, Arc)]
136pub trait NetworkEventListenerProvider: NetworkPeersEvents {
137 type Primitives: NetworkPrimitives;
139
140 fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
142 fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
150pub enum DiscoveryEvent {
151 NewNode(DiscoveredEvent),
153 EnrForkId(PeerId, ForkId),
155}
156
157#[derive(Debug, Clone, PartialEq, Eq)]
159pub enum DiscoveredEvent {
160 EventQueued {
172 peer_id: PeerId,
174 addr: PeerAddr,
176 fork_id: Option<ForkId>,
179 },
180}
181
182#[derive(Debug)]
184pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
185 GetBlockHeaders {
189 request: GetBlockHeaders,
191 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
193 },
194 GetBlockBodies {
198 request: GetBlockBodies,
200 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
202 },
203 GetPooledTransactions {
207 request: GetPooledTransactions,
209 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
211 },
212 GetNodeData {
216 request: GetNodeData,
218 response: oneshot::Sender<RequestResult<NodeData>>,
220 },
221 GetReceipts {
225 request: GetReceipts,
227 response: oneshot::Sender<RequestResult<Receipts>>,
229 },
230}
231
232impl<N: NetworkPrimitives> PeerRequest<N> {
235 pub fn send_bad_response(self) {
237 self.send_err_response(RequestError::BadResponse)
238 }
239
240 pub fn send_err_response(self, err: RequestError) {
242 let _ = match self {
243 Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
244 Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
245 Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
246 Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
247 Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
248 };
249 }
250
251 pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
253 match self {
254 Self::GetBlockHeaders { request, .. } => {
255 EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
256 }
257 Self::GetBlockBodies { request, .. } => {
258 EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
259 }
260 Self::GetPooledTransactions { request, .. } => {
261 EthMessage::GetPooledTransactions(RequestPair {
262 request_id,
263 message: request.clone(),
264 })
265 }
266 Self::GetNodeData { request, .. } => {
267 EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
268 }
269 Self::GetReceipts { request, .. } => {
270 EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
271 }
272 }
273 }
274
275 pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
277 match self {
278 Self::GetPooledTransactions { request, .. } => Some(request),
279 _ => None,
280 }
281 }
282}
283
284pub struct PeerRequestSender<R = PeerRequest> {
286 pub peer_id: PeerId,
288 pub to_session_tx: mpsc::Sender<R>,
290}
291
292impl<R> Clone for PeerRequestSender<R> {
293 fn clone(&self) -> Self {
294 Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() }
295 }
296}
297
298impl<R> PeerRequestSender<R> {
301 pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<R>) -> Self {
303 Self { peer_id, to_session_tx }
304 }
305
306 pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError<R>> {
308 self.to_session_tx.try_send(req)
309 }
310
311 pub const fn peer_id(&self) -> &PeerId {
313 &self.peer_id
314 }
315}
316
317impl<R> fmt::Debug for PeerRequestSender<R> {
318 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319 f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
320 }
321}