1use crate::types::Receipts69;
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11 message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
12 EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
13 NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
14 SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use std::{
20 sync::Arc,
21 task::{ready, Context, Poll},
22};
23use tokio::sync::oneshot;
24
25#[derive(Debug, Clone)]
27pub struct NewBlockMessage<B = reth_ethereum_primitives::Block> {
28 pub hash: B256,
30 pub block: Arc<NewBlock<B>>,
32}
33
34impl<B: reth_primitives_traits::Block> NewBlockMessage<B> {
37 pub fn number(&self) -> u64 {
39 self.block.block.header().number()
40 }
41}
42
43#[derive(Debug)]
46pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
47 NewBlockHashes(NewBlockHashes),
49 NewBlock(NewBlockMessage<N::Block>),
51 ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
53 SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
55 PooledTransactions(NewPooledTransactionHashes),
57 EthRequest(PeerRequest<N>),
59 BlockRangeUpdated(BlockRangeUpdate),
61 Other(RawCapabilityMessage),
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum BlockRequest {
70 GetBlockHeaders(GetBlockHeaders),
74
75 GetBlockBodies(GetBlockBodies),
79}
80
81#[derive(Debug)]
83pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
84 BlockHeaders {
86 response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
88 },
89 BlockBodies {
91 response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
93 },
94 PooledTransactions {
96 response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
98 },
99 NodeData {
101 response: oneshot::Receiver<RequestResult<NodeData>>,
103 },
104 Receipts {
106 response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
108 },
109}
110
111impl<N: NetworkPrimitives> PeerResponse<N> {
114 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
116 macro_rules! poll_request {
117 ($response:ident, $item:ident, $cx:ident) => {
118 match ready!($response.poll_unpin($cx)) {
119 Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
120 Err(err) => PeerResponseResult::$item(Err(err.into())),
121 }
122 };
123 }
124
125 let res = match self {
126 Self::BlockHeaders { response } => {
127 poll_request!(response, BlockHeaders, cx)
128 }
129 Self::BlockBodies { response } => {
130 poll_request!(response, BlockBodies, cx)
131 }
132 Self::PooledTransactions { response } => {
133 poll_request!(response, PooledTransactions, cx)
134 }
135 Self::NodeData { response } => {
136 poll_request!(response, NodeData, cx)
137 }
138 Self::Receipts { response } => {
139 poll_request!(response, Receipts, cx)
140 }
141 };
142 Poll::Ready(res)
143 }
144}
145
146#[derive(Debug)]
148pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
149 BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
151 BlockBodies(RequestResult<Vec<N::BlockBody>>),
153 PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
155 NodeData(RequestResult<Vec<Bytes>>),
157 Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
159 Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
161}
162
163impl<N: NetworkPrimitives> PeerResponseResult<N> {
166 pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
168 macro_rules! to_message {
169 ($response:ident, $item:ident, $request_id:ident) => {
170 match $response {
171 Ok(res) => {
172 let request = RequestPair { request_id: $request_id, message: $item(res) };
173 Ok(EthMessage::$item(request))
174 }
175 Err(err) => Err(err),
176 }
177 };
178 }
179 match self {
180 Self::BlockHeaders(resp) => {
181 to_message!(resp, BlockHeaders, id)
182 }
183 Self::BlockBodies(resp) => {
184 to_message!(resp, BlockBodies, id)
185 }
186 Self::PooledTransactions(resp) => {
187 to_message!(resp, PooledTransactions, id)
188 }
189 Self::NodeData(resp) => {
190 to_message!(resp, NodeData, id)
191 }
192 Self::Receipts(resp) => {
193 to_message!(resp, Receipts, id)
194 }
195 Self::Receipts69(resp) => {
196 to_message!(resp, Receipts69, id)
197 }
198 }
199 }
200
201 pub fn err(&self) -> Option<&RequestError> {
203 match self {
204 Self::BlockHeaders(res) => res.as_ref().err(),
205 Self::BlockBodies(res) => res.as_ref().err(),
206 Self::PooledTransactions(res) => res.as_ref().err(),
207 Self::NodeData(res) => res.as_ref().err(),
208 Self::Receipts(res) => res.as_ref().err(),
209 Self::Receipts69(res) => res.as_ref().err(),
210 }
211 }
212
213 pub fn is_err(&self) -> bool {
215 self.err().is_some()
216 }
217}