reth_eth_wire_types/
message.rs

1//! Implements Ethereum wire protocol for versions 66, 67, and 68.
2//! Defines structs/enums for messages, request-response pairs, and broadcasts.
3//! Handles compatibility with [`EthVersion`].
4//!
5//! Examples include creating, encoding, and decoding protocol messages.
6//!
7//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol).
8
9use super::{
10    broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
11    GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes66,
12    NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
13    Transactions,
14};
15use crate::{
16    status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
17    RawCapabilityMessage, Receipts69, SharedTransactions,
18};
19use alloc::{boxed::Box, sync::Arc};
20use alloy_primitives::{
21    bytes::{Buf, BufMut},
22    Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
28// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
29pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31/// Error when sending/receiving a message
32#[derive(thiserror::Error, Debug)]
33pub enum MessageError {
34    /// Flags an unrecognized message ID for a given protocol version.
35    #[error("message id {1:?} is invalid for version {0:?}")]
36    Invalid(EthVersion, EthMessageID),
37    /// Thrown when rlp decoding a message failed.
38    #[error("RLP error: {0}")]
39    RlpError(#[from] alloy_rlp::Error),
40}
41
42/// An `eth` protocol message, containing a message ID and payload.
43#[derive(Clone, Debug, PartialEq, Eq)]
44#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
46    /// The unique identifier representing the type of the Ethereum message.
47    pub message_type: EthMessageID,
48    /// The content of the message, including specific data based on the message type.
49    #[cfg_attr(
50        feature = "serde",
51        serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
52    )]
53    pub message: EthMessage<N>,
54}
55
56impl<N: NetworkPrimitives> ProtocolMessage<N> {
57    /// Create a new `ProtocolMessage` from a message type and message rlp bytes.
58    ///
59    /// This will enforce decoding according to the given [`EthVersion`] of the connection.
60    pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
61        let message_type = EthMessageID::decode(buf)?;
62
63        // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md):
64        // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it.
65        let message = match message_type {
66            EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
67                StatusMessage::Legacy(Status::decode(buf)?)
68            } else {
69                StatusMessage::Eth69(StatusEth69::decode(buf)?)
70            }),
71            EthMessageID::NewBlockHashes => {
72                if version.is_eth69() {
73                    return Err(MessageError::Invalid(version, EthMessageID::NewBlockHashes));
74                }
75                EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
76            }
77            EthMessageID::NewBlock => {
78                if version.is_eth69() {
79                    return Err(MessageError::Invalid(version, EthMessageID::NewBlock));
80                }
81                EthMessage::NewBlock(Box::new(NewBlock::decode(buf)?))
82            }
83            EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
84            EthMessageID::NewPooledTransactionHashes => {
85                if version >= EthVersion::Eth68 {
86                    EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
87                        buf,
88                    )?)
89                } else {
90                    EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
91                        buf,
92                    )?)
93                }
94            }
95            EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
96            EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
97            EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
98            EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
99            EthMessageID::GetPooledTransactions => {
100                EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
101            }
102            EthMessageID::PooledTransactions => {
103                EthMessage::PooledTransactions(RequestPair::decode(buf)?)
104            }
105            EthMessageID::GetNodeData => {
106                if version >= EthVersion::Eth67 {
107                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
108                }
109                EthMessage::GetNodeData(RequestPair::decode(buf)?)
110            }
111            EthMessageID::NodeData => {
112                if version >= EthVersion::Eth67 {
113                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
114                }
115                EthMessage::NodeData(RequestPair::decode(buf)?)
116            }
117            EthMessageID::GetReceipts => EthMessage::GetReceipts(RequestPair::decode(buf)?),
118            EthMessageID::Receipts => {
119                if version < EthVersion::Eth69 {
120                    EthMessage::Receipts(RequestPair::decode(buf)?)
121                } else {
122                    // with eth69, receipts no longer include the bloom
123                    EthMessage::Receipts69(RequestPair::decode(buf)?)
124                }
125            }
126            EthMessageID::BlockRangeUpdate => {
127                if version < EthVersion::Eth69 {
128                    return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
129                }
130                EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
131            }
132            EthMessageID::Other(_) => {
133                let raw_payload = Bytes::copy_from_slice(buf);
134                buf.advance(raw_payload.len());
135                EthMessage::Other(RawCapabilityMessage::new(
136                    message_type.to_u8() as usize,
137                    raw_payload.into(),
138                ))
139            }
140        };
141        Ok(Self { message_type, message })
142    }
143}
144
145impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
146    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
147    /// prepended to the message.
148    fn encode(&self, out: &mut dyn BufMut) {
149        self.message_type.encode(out);
150        self.message.encode(out);
151    }
152    fn length(&self) -> usize {
153        self.message_type.length() + self.message.length()
154    }
155}
156
157impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
158    fn from(message: EthMessage<N>) -> Self {
159        Self { message_type: message.message_id(), message }
160    }
161}
162
163/// Represents messages that can be sent to multiple peers.
164#[derive(Clone, Debug)]
165pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
166    /// The unique identifier representing the type of the Ethereum message.
167    pub message_type: EthMessageID,
168    /// The content of the message to be broadcasted, including specific data based on the message
169    /// type.
170    pub message: EthBroadcastMessage<N>,
171}
172
173impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
174    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
175    /// prepended to the message.
176    fn encode(&self, out: &mut dyn BufMut) {
177        self.message_type.encode(out);
178        self.message.encode(out);
179    }
180    fn length(&self) -> usize {
181        self.message_type.length() + self.message.length()
182    }
183}
184
185impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
186    fn from(message: EthBroadcastMessage<N>) -> Self {
187        Self { message_type: message.message_id(), message }
188    }
189}
190
191/// Represents a message in the eth wire protocol, versions 66, 67, 68 and 69.
192///
193/// The ethereum wire protocol is a set of messages that are broadcast to the network in two
194/// styles:
195///  * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated
196///    response message (such as [`PooledTransactions`]).
197///  * A message that is broadcast to the network, without a corresponding request.
198///
199/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
200/// correlate request-response message pairs. This allows for request multiplexing.
201///
202/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and
203/// [`NodeData`].
204///
205/// The `eth/68` changes only `NewPooledTransactionHashes` to include `types` and `sized`. For
206/// it, `NewPooledTransactionHashes` is renamed as [`NewPooledTransactionHashes66`] and
207/// [`NewPooledTransactionHashes68`] is defined.
208///
209/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
210/// information. And removes the Bloom field from receipts transferred over the protocol.
211#[derive(Clone, Debug, PartialEq, Eq)]
212#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
213pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
214    /// Represents a Status message required for the protocol handshake.
215    Status(StatusMessage),
216    /// Represents a `NewBlockHashes` message broadcast to the network.
217    NewBlockHashes(NewBlockHashes),
218    /// Represents a `NewBlock` message broadcast to the network.
219    #[cfg_attr(
220        feature = "serde",
221        serde(bound = "N::Block: serde::Serialize + serde::de::DeserializeOwned")
222    )]
223    NewBlock(Box<NewBlock<N::Block>>),
224    /// Represents a Transactions message broadcast to the network.
225    #[cfg_attr(
226        feature = "serde",
227        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
228    )]
229    Transactions(Transactions<N::BroadcastedTransaction>),
230    /// Represents a `NewPooledTransactionHashes` message for eth/66 version.
231    NewPooledTransactionHashes66(NewPooledTransactionHashes66),
232    /// Represents a `NewPooledTransactionHashes` message for eth/68 version.
233    NewPooledTransactionHashes68(NewPooledTransactionHashes68),
234    // The following messages are request-response message pairs
235    /// Represents a `GetBlockHeaders` request-response pair.
236    GetBlockHeaders(RequestPair<GetBlockHeaders>),
237    /// Represents a `BlockHeaders` request-response pair.
238    #[cfg_attr(
239        feature = "serde",
240        serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
241    )]
242    BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
243    /// Represents a `GetBlockBodies` request-response pair.
244    GetBlockBodies(RequestPair<GetBlockBodies>),
245    /// Represents a `BlockBodies` request-response pair.
246    #[cfg_attr(
247        feature = "serde",
248        serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
249    )]
250    BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
251    /// Represents a `GetPooledTransactions` request-response pair.
252    GetPooledTransactions(RequestPair<GetPooledTransactions>),
253    /// Represents a `PooledTransactions` request-response pair.
254    #[cfg_attr(
255        feature = "serde",
256        serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
257    )]
258    PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
259    /// Represents a `GetNodeData` request-response pair.
260    GetNodeData(RequestPair<GetNodeData>),
261    /// Represents a `NodeData` request-response pair.
262    NodeData(RequestPair<NodeData>),
263    /// Represents a `GetReceipts` request-response pair.
264    GetReceipts(RequestPair<GetReceipts>),
265    /// Represents a Receipts request-response pair.
266    #[cfg_attr(
267        feature = "serde",
268        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
269    )]
270    Receipts(RequestPair<Receipts<N::Receipt>>),
271    /// Represents a Receipts request-response pair for eth/69.
272    #[cfg_attr(
273        feature = "serde",
274        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
275    )]
276    Receipts69(RequestPair<Receipts69<N::Receipt>>),
277    /// Represents a `BlockRangeUpdate` message broadcast to the network.
278    #[cfg_attr(
279        feature = "serde",
280        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
281    )]
282    BlockRangeUpdate(BlockRangeUpdate),
283    /// Represents an encoded message that doesn't match any other variant
284    Other(RawCapabilityMessage),
285}
286
287impl<N: NetworkPrimitives> EthMessage<N> {
288    /// Returns the message's ID.
289    pub const fn message_id(&self) -> EthMessageID {
290        match self {
291            Self::Status(_) => EthMessageID::Status,
292            Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
293            Self::NewBlock(_) => EthMessageID::NewBlock,
294            Self::Transactions(_) => EthMessageID::Transactions,
295            Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
296                EthMessageID::NewPooledTransactionHashes
297            }
298            Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
299            Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
300            Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
301            Self::BlockBodies(_) => EthMessageID::BlockBodies,
302            Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
303            Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
304            Self::GetNodeData(_) => EthMessageID::GetNodeData,
305            Self::NodeData(_) => EthMessageID::NodeData,
306            Self::GetReceipts(_) => EthMessageID::GetReceipts,
307            Self::Receipts(_) | Self::Receipts69(_) => EthMessageID::Receipts,
308            Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
309            Self::Other(msg) => EthMessageID::Other(msg.id as u8),
310        }
311    }
312
313    /// Returns true if the message variant is a request.
314    pub const fn is_request(&self) -> bool {
315        matches!(
316            self,
317            Self::GetBlockBodies(_) |
318                Self::GetBlockHeaders(_) |
319                Self::GetReceipts(_) |
320                Self::GetPooledTransactions(_) |
321                Self::GetNodeData(_)
322        )
323    }
324
325    /// Returns true if the message variant is a response to a request.
326    pub const fn is_response(&self) -> bool {
327        matches!(
328            self,
329            Self::PooledTransactions(_) |
330                Self::Receipts(_) |
331                Self::BlockHeaders(_) |
332                Self::BlockBodies(_) |
333                Self::NodeData(_)
334        )
335    }
336}
337
338impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
339    fn encode(&self, out: &mut dyn BufMut) {
340        match self {
341            Self::Status(status) => status.encode(out),
342            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
343            Self::NewBlock(new_block) => new_block.encode(out),
344            Self::Transactions(transactions) => transactions.encode(out),
345            Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
346            Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
347            Self::GetBlockHeaders(request) => request.encode(out),
348            Self::BlockHeaders(headers) => headers.encode(out),
349            Self::GetBlockBodies(request) => request.encode(out),
350            Self::BlockBodies(bodies) => bodies.encode(out),
351            Self::GetPooledTransactions(request) => request.encode(out),
352            Self::PooledTransactions(transactions) => transactions.encode(out),
353            Self::GetNodeData(request) => request.encode(out),
354            Self::NodeData(data) => data.encode(out),
355            Self::GetReceipts(request) => request.encode(out),
356            Self::Receipts(receipts) => receipts.encode(out),
357            Self::Receipts69(receipt69) => receipt69.encode(out),
358            Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
359            Self::Other(unknown) => out.put_slice(&unknown.payload),
360        }
361    }
362    fn length(&self) -> usize {
363        match self {
364            Self::Status(status) => status.length(),
365            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
366            Self::NewBlock(new_block) => new_block.length(),
367            Self::Transactions(transactions) => transactions.length(),
368            Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
369            Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
370            Self::GetBlockHeaders(request) => request.length(),
371            Self::BlockHeaders(headers) => headers.length(),
372            Self::GetBlockBodies(request) => request.length(),
373            Self::BlockBodies(bodies) => bodies.length(),
374            Self::GetPooledTransactions(request) => request.length(),
375            Self::PooledTransactions(transactions) => transactions.length(),
376            Self::GetNodeData(request) => request.length(),
377            Self::NodeData(data) => data.length(),
378            Self::GetReceipts(request) => request.length(),
379            Self::Receipts(receipts) => receipts.length(),
380            Self::Receipts69(receipt69) => receipt69.length(),
381            Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
382            Self::Other(unknown) => unknown.length(),
383        }
384    }
385}
386
387/// Represents broadcast messages of [`EthMessage`] with the same object that can be sent to
388/// multiple peers.
389///
390/// Messages that contain a list of hashes depend on the peer the message is sent to. A peer should
391/// never receive a hash of an object (block, transaction) it has already seen.
392///
393/// Note: This is only useful for outgoing messages.
394#[derive(Clone, Debug, PartialEq, Eq)]
395pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
396    /// Represents a new block broadcast message.
397    NewBlock(Arc<NewBlock<N::Block>>),
398    /// Represents a transactions broadcast message.
399    Transactions(SharedTransactions<N::BroadcastedTransaction>),
400}
401
402// === impl EthBroadcastMessage ===
403
404impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
405    /// Returns the message's ID.
406    pub const fn message_id(&self) -> EthMessageID {
407        match self {
408            Self::NewBlock(_) => EthMessageID::NewBlock,
409            Self::Transactions(_) => EthMessageID::Transactions,
410        }
411    }
412}
413
414impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
415    fn encode(&self, out: &mut dyn BufMut) {
416        match self {
417            Self::NewBlock(new_block) => new_block.encode(out),
418            Self::Transactions(transactions) => transactions.encode(out),
419        }
420    }
421
422    fn length(&self) -> usize {
423        match self {
424            Self::NewBlock(new_block) => new_block.length(),
425            Self::Transactions(transactions) => transactions.length(),
426        }
427    }
428}
429
430/// Represents message IDs for eth protocol messages.
431#[repr(u8)]
432#[derive(Clone, Copy, Debug, PartialEq, Eq)]
433#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
434pub enum EthMessageID {
435    /// Status message.
436    Status = 0x00,
437    /// New block hashes message.
438    NewBlockHashes = 0x01,
439    /// Transactions message.
440    Transactions = 0x02,
441    /// Get block headers message.
442    GetBlockHeaders = 0x03,
443    /// Block headers message.
444    BlockHeaders = 0x04,
445    /// Get block bodies message.
446    GetBlockBodies = 0x05,
447    /// Block bodies message.
448    BlockBodies = 0x06,
449    /// New block message.
450    NewBlock = 0x07,
451    /// New pooled transaction hashes message.
452    NewPooledTransactionHashes = 0x08,
453    /// Requests pooled transactions.
454    GetPooledTransactions = 0x09,
455    /// Represents pooled transactions.
456    PooledTransactions = 0x0a,
457    /// Requests node data.
458    GetNodeData = 0x0d,
459    /// Represents node data.
460    NodeData = 0x0e,
461    /// Requests receipts.
462    GetReceipts = 0x0f,
463    /// Represents receipts.
464    Receipts = 0x10,
465    /// Block range update.
466    ///
467    /// Introduced in Eth69
468    BlockRangeUpdate = 0x11,
469    /// Represents unknown message types.
470    Other(u8),
471}
472
473impl EthMessageID {
474    /// Returns the corresponding `u8` value for an `EthMessageID`.
475    pub const fn to_u8(&self) -> u8 {
476        match self {
477            Self::Status => 0x00,
478            Self::NewBlockHashes => 0x01,
479            Self::Transactions => 0x02,
480            Self::GetBlockHeaders => 0x03,
481            Self::BlockHeaders => 0x04,
482            Self::GetBlockBodies => 0x05,
483            Self::BlockBodies => 0x06,
484            Self::NewBlock => 0x07,
485            Self::NewPooledTransactionHashes => 0x08,
486            Self::GetPooledTransactions => 0x09,
487            Self::PooledTransactions => 0x0a,
488            Self::GetNodeData => 0x0d,
489            Self::NodeData => 0x0e,
490            Self::GetReceipts => 0x0f,
491            Self::Receipts => 0x10,
492            Self::BlockRangeUpdate => 0x11,
493            Self::Other(value) => *value, // Return the stored `u8`
494        }
495    }
496
497    /// Returns the max value for the given version.
498    pub const fn max(version: EthVersion) -> u8 {
499        if version.is_eth69() {
500            Self::BlockRangeUpdate.to_u8()
501        } else {
502            Self::Receipts.to_u8()
503        }
504    }
505}
506
507impl Encodable for EthMessageID {
508    fn encode(&self, out: &mut dyn BufMut) {
509        out.put_u8(self.to_u8());
510    }
511    fn length(&self) -> usize {
512        1
513    }
514}
515
516impl Decodable for EthMessageID {
517    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
518        let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
519            0x00 => Self::Status,
520            0x01 => Self::NewBlockHashes,
521            0x02 => Self::Transactions,
522            0x03 => Self::GetBlockHeaders,
523            0x04 => Self::BlockHeaders,
524            0x05 => Self::GetBlockBodies,
525            0x06 => Self::BlockBodies,
526            0x07 => Self::NewBlock,
527            0x08 => Self::NewPooledTransactionHashes,
528            0x09 => Self::GetPooledTransactions,
529            0x0a => Self::PooledTransactions,
530            0x0d => Self::GetNodeData,
531            0x0e => Self::NodeData,
532            0x0f => Self::GetReceipts,
533            0x10 => Self::Receipts,
534            0x11 => Self::BlockRangeUpdate,
535            unknown => Self::Other(*unknown),
536        };
537        buf.advance(1);
538        Ok(id)
539    }
540}
541
542impl TryFrom<usize> for EthMessageID {
543    type Error = &'static str;
544
545    fn try_from(value: usize) -> Result<Self, Self::Error> {
546        match value {
547            0x00 => Ok(Self::Status),
548            0x01 => Ok(Self::NewBlockHashes),
549            0x02 => Ok(Self::Transactions),
550            0x03 => Ok(Self::GetBlockHeaders),
551            0x04 => Ok(Self::BlockHeaders),
552            0x05 => Ok(Self::GetBlockBodies),
553            0x06 => Ok(Self::BlockBodies),
554            0x07 => Ok(Self::NewBlock),
555            0x08 => Ok(Self::NewPooledTransactionHashes),
556            0x09 => Ok(Self::GetPooledTransactions),
557            0x0a => Ok(Self::PooledTransactions),
558            0x0d => Ok(Self::GetNodeData),
559            0x0e => Ok(Self::NodeData),
560            0x0f => Ok(Self::GetReceipts),
561            0x10 => Ok(Self::Receipts),
562            0x11 => Ok(Self::BlockRangeUpdate),
563            _ => Err("Invalid message ID"),
564        }
565    }
566}
567
568/// This is used for all request-response style `eth` protocol messages.
569/// This can represent either a request or a response, since both include a message payload and
570/// request id.
571#[derive(Clone, Debug, PartialEq, Eq)]
572#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
573#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
574pub struct RequestPair<T> {
575    /// id for the contained request or response message
576    pub request_id: u64,
577
578    /// the request or response message payload
579    pub message: T,
580}
581
582impl<T> RequestPair<T> {
583    /// Converts the message type with the given closure.
584    pub fn map<F, R>(self, f: F) -> RequestPair<R>
585    where
586        F: FnOnce(T) -> R,
587    {
588        let Self { request_id, message } = self;
589        RequestPair { request_id, message: f(message) }
590    }
591}
592
593/// Allows messages with request ids to be serialized into RLP bytes.
594impl<T> Encodable for RequestPair<T>
595where
596    T: Encodable,
597{
598    fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
599        let header =
600            Header { list: true, payload_length: self.request_id.length() + self.message.length() };
601
602        header.encode(out);
603        self.request_id.encode(out);
604        self.message.encode(out);
605    }
606
607    fn length(&self) -> usize {
608        let mut length = 0;
609        length += self.request_id.length();
610        length += self.message.length();
611        length += length_of_length(length);
612        length
613    }
614}
615
616/// Allows messages with request ids to be deserialized into RLP bytes.
617impl<T> Decodable for RequestPair<T>
618where
619    T: Decodable,
620{
621    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
622        let header = Header::decode(buf)?;
623
624        let initial_length = buf.len();
625        let request_id = u64::decode(buf)?;
626        let message = T::decode(buf)?;
627
628        // Check that the buffer consumed exactly payload_length bytes after decoding the
629        // RequestPair
630        let consumed_len = initial_length - buf.len();
631        if consumed_len != header.payload_length {
632            return Err(alloy_rlp::Error::UnexpectedLength)
633        }
634
635        Ok(Self { request_id, message })
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::MessageError;
642    use crate::{
643        message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
644        GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
645    };
646    use alloy_primitives::hex;
647    use alloy_rlp::{Decodable, Encodable, Error};
648    use reth_ethereum_primitives::BlockBody;
649
650    fn encode<T: Encodable>(value: T) -> Vec<u8> {
651        let mut buf = vec![];
652        value.encode(&mut buf);
653        buf
654    }
655
656    #[test]
657    fn test_removed_message_at_eth67() {
658        let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
659            request_id: 1337,
660            message: GetNodeData(vec![]),
661        });
662        let buf = encode(ProtocolMessage {
663            message_type: EthMessageID::GetNodeData,
664            message: get_node_data,
665        });
666        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
667            crate::EthVersion::Eth67,
668            &mut &buf[..],
669        );
670        assert!(matches!(msg, Err(MessageError::Invalid(..))));
671
672        let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
673            request_id: 1337,
674            message: NodeData(vec![]),
675        });
676        let buf =
677            encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
678        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
679            crate::EthVersion::Eth67,
680            &mut &buf[..],
681        );
682        assert!(matches!(msg, Err(MessageError::Invalid(..))));
683    }
684
685    #[test]
686    fn request_pair_encode() {
687        let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
688
689        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
690        // 82: 0x80 + len(1337)
691        // 05 39: 1337 (request_id)
692        // === full_list ===
693        // c1: start of list (c0) + len(list) (length is <55 bytes)
694        // 05: 5 (message)
695        let expected = hex!("c5820539c105");
696        let got = encode(request_pair);
697        assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
698    }
699
700    #[test]
701    fn request_pair_decode() {
702        let raw_pair = &hex!("c5820539c105")[..];
703
704        let expected = RequestPair { request_id: 1337, message: vec![5u8] };
705
706        let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
707        assert_eq!(expected.length(), raw_pair.len());
708        assert_eq!(expected, got);
709    }
710
711    #[test]
712    fn malicious_request_pair_decode() {
713        // A maliciously encoded request pair, where the len(full_list) is 5, but it
714        // actually consumes 6 bytes when decoding
715        //
716        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
717        // 82: 0x80 + len(1337)
718        // 05 39: 1337 (request_id)
719        // === full_list ===
720        // c2: start of list (c0) + len(list) (length is <55 bytes)
721        // 05 05: 5 5(message)
722        let raw_pair = &hex!("c5820539c20505")[..];
723
724        let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
725        assert!(matches!(result, Err(Error::UnexpectedLength)));
726    }
727
728    #[test]
729    fn empty_block_bodies_protocol() {
730        let empty_block_bodies =
731            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
732                request_id: 0,
733                message: Default::default(),
734            }));
735        let mut buf = Vec::new();
736        empty_block_bodies.encode(&mut buf);
737        let decoded =
738            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
739        assert_eq!(empty_block_bodies, decoded);
740    }
741
742    #[test]
743    fn empty_block_body_protocol() {
744        let empty_block_bodies =
745            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
746                request_id: 0,
747                message: vec![BlockBody {
748                    transactions: vec![],
749                    ommers: vec![],
750                    withdrawals: Some(Default::default()),
751                }]
752                .into(),
753            }));
754        let mut buf = Vec::new();
755        empty_block_bodies.encode(&mut buf);
756        let decoded =
757            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
758        assert_eq!(empty_block_bodies, decoded);
759    }
760
761    #[test]
762    fn decode_block_bodies_message() {
763        let buf = hex!("06c48199c1c0");
764        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
765            EthVersion::Eth68,
766            &mut &buf[..],
767        )
768        .unwrap_err();
769        assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
770    }
771
772    #[test]
773    fn custom_message_roundtrip() {
774        let custom_payload = vec![1, 2, 3, 4, 5];
775        let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
776        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
777            message_type: EthMessageID::Other(0x20),
778            message: EthMessage::Other(custom_message),
779        };
780
781        let encoded = encode(protocol_message.clone());
782        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
783            EthVersion::Eth68,
784            &mut &encoded[..],
785        )
786        .unwrap();
787
788        assert_eq!(protocol_message, decoded);
789    }
790
791    #[test]
792    fn custom_message_empty_payload_roundtrip() {
793        let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
794        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
795            message_type: EthMessageID::Other(0x30),
796            message: EthMessage::Other(custom_message),
797        };
798
799        let encoded = encode(protocol_message.clone());
800        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
801            EthVersion::Eth68,
802            &mut &encoded[..],
803        )
804        .unwrap();
805
806        assert_eq!(protocol_message, decoded);
807    }
808}