reth_network/
message.rs

1//! Capability messaging
2//!
3//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
4//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
5
6use crate::types::Receipts69;
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11    message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
12    EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
13    NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
14    SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use std::{
20    sync::Arc,
21    task::{ready, Context, Poll},
22};
23use tokio::sync::oneshot;
24
25/// Internal form of a `NewBlock` message
26#[derive(Debug, Clone)]
27pub struct NewBlockMessage<B = reth_ethereum_primitives::Block> {
28    /// Hash of the block
29    pub hash: B256,
30    /// Raw received message
31    pub block: Arc<NewBlock<B>>,
32}
33
34// === impl NewBlockMessage ===
35
36impl<B: reth_primitives_traits::Block> NewBlockMessage<B> {
37    /// Returns the block number of the block
38    pub fn number(&self) -> u64 {
39        self.block.block.header().number()
40    }
41}
42
43/// All Bi-directional eth-message variants that can be sent to a session or received from a
44/// session.
45#[derive(Debug)]
46pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
47    /// Announce new block hashes
48    NewBlockHashes(NewBlockHashes),
49    /// Broadcast new block.
50    NewBlock(NewBlockMessage<N::Block>),
51    /// Received transactions _from_ the peer
52    ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
53    /// Broadcast transactions _from_ local _to_ a peer.
54    SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
55    /// Send new pooled transactions
56    PooledTransactions(NewPooledTransactionHashes),
57    /// All `eth` request variants.
58    EthRequest(PeerRequest<N>),
59    /// Announces when `BlockRange` is updated.
60    BlockRangeUpdated(BlockRangeUpdate),
61    /// Any other or manually crafted eth message.
62    ///
63    /// Caution: It is expected that this is a valid `eth_` capability message.
64    Other(RawCapabilityMessage),
65}
66
67/// Request Variants that only target block related data.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum BlockRequest {
70    /// Requests block headers from the peer.
71    ///
72    /// The response should be sent through the channel.
73    GetBlockHeaders(GetBlockHeaders),
74
75    /// Requests block bodies from the peer.
76    ///
77    /// The response should be sent through the channel.
78    GetBlockBodies(GetBlockBodies),
79}
80
81/// Corresponding variant for [`PeerRequest`].
82#[derive(Debug)]
83pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
84    /// Represents a response to a request for block headers.
85    BlockHeaders {
86        /// The receiver channel for the response to a block headers request.
87        response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
88    },
89    /// Represents a response to a request for block bodies.
90    BlockBodies {
91        /// The receiver channel for the response to a block bodies request.
92        response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
93    },
94    /// Represents a response to a request for pooled transactions.
95    PooledTransactions {
96        /// The receiver channel for the response to a pooled transactions request.
97        response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
98    },
99    /// Represents a response to a request for `NodeData`.
100    NodeData {
101        /// The receiver channel for the response to a `NodeData` request.
102        response: oneshot::Receiver<RequestResult<NodeData>>,
103    },
104    /// Represents a response to a request for receipts.
105    Receipts {
106        /// The receiver channel for the response to a receipts request.
107        response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
108    },
109}
110
111// === impl PeerResponse ===
112
113impl<N: NetworkPrimitives> PeerResponse<N> {
114    /// Polls the type to completion.
115    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
116        macro_rules! poll_request {
117            ($response:ident, $item:ident, $cx:ident) => {
118                match ready!($response.poll_unpin($cx)) {
119                    Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
120                    Err(err) => PeerResponseResult::$item(Err(err.into())),
121                }
122            };
123        }
124
125        let res = match self {
126            Self::BlockHeaders { response } => {
127                poll_request!(response, BlockHeaders, cx)
128            }
129            Self::BlockBodies { response } => {
130                poll_request!(response, BlockBodies, cx)
131            }
132            Self::PooledTransactions { response } => {
133                poll_request!(response, PooledTransactions, cx)
134            }
135            Self::NodeData { response } => {
136                poll_request!(response, NodeData, cx)
137            }
138            Self::Receipts { response } => {
139                poll_request!(response, Receipts, cx)
140            }
141        };
142        Poll::Ready(res)
143    }
144}
145
146/// All response variants for [`PeerResponse`]
147#[derive(Debug)]
148pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
149    /// Represents a result containing block headers or an error.
150    BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
151    /// Represents a result containing block bodies or an error.
152    BlockBodies(RequestResult<Vec<N::BlockBody>>),
153    /// Represents a result containing pooled transactions or an error.
154    PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
155    /// Represents a result containing node data or an error.
156    NodeData(RequestResult<Vec<Bytes>>),
157    /// Represents a result containing receipts or an error.
158    Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
159    /// Represents a result containing receipts or an error for eth/69.
160    Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
161}
162
163// === impl PeerResponseResult ===
164
165impl<N: NetworkPrimitives> PeerResponseResult<N> {
166    /// Converts this response into an [`EthMessage`]
167    pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
168        macro_rules! to_message {
169            ($response:ident, $item:ident, $request_id:ident) => {
170                match $response {
171                    Ok(res) => {
172                        let request = RequestPair { request_id: $request_id, message: $item(res) };
173                        Ok(EthMessage::$item(request))
174                    }
175                    Err(err) => Err(err),
176                }
177            };
178        }
179        match self {
180            Self::BlockHeaders(resp) => {
181                to_message!(resp, BlockHeaders, id)
182            }
183            Self::BlockBodies(resp) => {
184                to_message!(resp, BlockBodies, id)
185            }
186            Self::PooledTransactions(resp) => {
187                to_message!(resp, PooledTransactions, id)
188            }
189            Self::NodeData(resp) => {
190                to_message!(resp, NodeData, id)
191            }
192            Self::Receipts(resp) => {
193                to_message!(resp, Receipts, id)
194            }
195            Self::Receipts69(resp) => {
196                to_message!(resp, Receipts69, id)
197            }
198        }
199    }
200
201    /// Returns the `Err` value if the result is an error.
202    pub fn err(&self) -> Option<&RequestError> {
203        match self {
204            Self::BlockHeaders(res) => res.as_ref().err(),
205            Self::BlockBodies(res) => res.as_ref().err(),
206            Self::PooledTransactions(res) => res.as_ref().err(),
207            Self::NodeData(res) => res.as_ref().err(),
208            Self::Receipts(res) => res.as_ref().err(),
209            Self::Receipts69(res) => res.as_ref().err(),
210        }
211    }
212
213    /// Returns whether this result is an error.
214    pub fn is_err(&self) -> bool {
215        self.err().is_some()
216    }
217}