reth_eth_wire/
capability.rs

1//! All capability related types
2
3use crate::{
4    errors::{P2PHandshakeError, P2PStreamError},
5    p2pstream::MAX_RESERVED_MESSAGE_ID,
6    protocol::{ProtoVersion, Protocol},
7    version::ParseVersionError,
8    Capability, EthMessageID, EthVersion,
9};
10use alloy_primitives::bytes::Bytes;
11use derive_more::{Deref, DerefMut};
12use reth_eth_wire_types::{EthMessage, EthNetworkPrimitives, NetworkPrimitives};
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15use std::{
16    borrow::Cow,
17    collections::{BTreeSet, HashMap},
18};
19
20/// A Capability message consisting of the message-id and the payload
21#[derive(Debug, Clone, Eq, PartialEq)]
22#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
23pub struct RawCapabilityMessage {
24    /// Identifier of the message.
25    pub id: usize,
26    /// Actual __encoded__ payload
27    pub payload: Bytes,
28}
29
30impl RawCapabilityMessage {
31    /// Creates a new capability message with the given id and payload.
32    pub const fn new(id: usize, payload: Bytes) -> Self {
33        Self { id, payload }
34    }
35
36    /// Creates a raw message for the eth sub-protocol.
37    ///
38    /// Caller must ensure that the rlp encoded `payload` matches the given `id`.
39    ///
40    /// See also  [`EthMessage`]
41    pub const fn eth(id: EthMessageID, payload: Bytes) -> Self {
42        Self::new(id as usize, payload)
43    }
44}
45
46/// Various protocol related event types bubbled up from a session that need to be handled by the
47/// network.
48#[derive(Debug)]
49#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
50pub enum CapabilityMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
51    /// Eth sub-protocol message.
52    #[cfg_attr(
53        feature = "serde",
54        serde(bound = "EthMessage<N>: Serialize + serde::de::DeserializeOwned")
55    )]
56    Eth(EthMessage<N>),
57    /// Any other or manually crafted eth message.
58    Other(RawCapabilityMessage),
59}
60
61/// This represents a shared capability, its version, and its message id offset.
62///
63/// The [offset](SharedCapability::message_id_offset) is the message ID offset for this shared
64/// capability, determined during the rlpx handshake.
65///
66/// See also [Message-id based multiplexing](https://github.com/ethereum/devp2p/blob/master/rlpx.md#message-id-based-multiplexing)
67#[derive(Debug, Clone, PartialEq, Eq, Hash)]
68pub enum SharedCapability {
69    /// The `eth` capability.
70    Eth {
71        /// (Highest) negotiated version of the eth capability.
72        version: EthVersion,
73        /// The message ID offset for this capability.
74        ///
75        /// This represents the message ID offset for the first message of the eth capability in
76        /// the message id space.
77        offset: u8,
78    },
79    /// Any other unknown capability.
80    UnknownCapability {
81        /// Shared capability.
82        cap: Capability,
83        /// The message ID offset for this capability.
84        ///
85        /// This represents the message ID offset for the first message of the eth capability in
86        /// the message id space.
87        offset: u8,
88        /// The number of messages of this capability. Needed to calculate range of message IDs in
89        /// demuxing.
90        messages: u8,
91    },
92}
93
94impl SharedCapability {
95    /// Creates a new [`SharedCapability`] based on the given name, offset, version (and messages
96    /// if the capability is custom).
97    ///
98    /// Returns an error if the offset is equal or less than [`MAX_RESERVED_MESSAGE_ID`].
99    pub(crate) fn new(
100        name: &str,
101        version: u8,
102        offset: u8,
103        messages: u8,
104    ) -> Result<Self, SharedCapabilityError> {
105        if offset <= MAX_RESERVED_MESSAGE_ID {
106            return Err(SharedCapabilityError::ReservedMessageIdOffset(offset))
107        }
108
109        match name {
110            "eth" => Ok(Self::eth(EthVersion::try_from(version)?, offset)),
111            _ => Ok(Self::UnknownCapability {
112                cap: Capability::new(name.to_string(), version as usize),
113                offset,
114                messages,
115            }),
116        }
117    }
118
119    /// Creates a new [`SharedCapability`] based on the given name, offset, and version.
120    pub(crate) const fn eth(version: EthVersion, offset: u8) -> Self {
121        Self::Eth { version, offset }
122    }
123
124    /// Returns the capability.
125    pub const fn capability(&self) -> Cow<'_, Capability> {
126        match self {
127            Self::Eth { version, .. } => Cow::Owned(Capability::eth(*version)),
128            Self::UnknownCapability { cap, .. } => Cow::Borrowed(cap),
129        }
130    }
131
132    /// Returns the name of the capability.
133    #[inline]
134    pub fn name(&self) -> &str {
135        match self {
136            Self::Eth { .. } => "eth",
137            Self::UnknownCapability { cap, .. } => cap.name.as_ref(),
138        }
139    }
140
141    /// Returns true if the capability is eth.
142    #[inline]
143    pub const fn is_eth(&self) -> bool {
144        matches!(self, Self::Eth { .. })
145    }
146
147    /// Returns the version of the capability.
148    pub const fn version(&self) -> u8 {
149        match self {
150            Self::Eth { version, .. } => *version as u8,
151            Self::UnknownCapability { cap, .. } => cap.version as u8,
152        }
153    }
154
155    /// Returns the eth version if it's the `eth` capability.
156    pub const fn eth_version(&self) -> Option<EthVersion> {
157        match self {
158            Self::Eth { version, .. } => Some(*version),
159            _ => None,
160        }
161    }
162
163    /// Returns the message ID offset of the current capability.
164    ///
165    /// This represents the message ID offset for the first message of the eth capability in the
166    /// message id space.
167    pub const fn message_id_offset(&self) -> u8 {
168        match self {
169            Self::Eth { offset, .. } | Self::UnknownCapability { offset, .. } => *offset,
170        }
171    }
172
173    /// Returns the message ID offset of the current capability relative to the start of the
174    /// reserved message id space: [`MAX_RESERVED_MESSAGE_ID`].
175    pub const fn relative_message_id_offset(&self) -> u8 {
176        self.message_id_offset() - MAX_RESERVED_MESSAGE_ID - 1
177    }
178
179    /// Returns the number of protocol messages supported by this capability.
180    pub const fn num_messages(&self) -> u8 {
181        match self {
182            Self::Eth { version: _version, .. } => EthMessageID::max() + 1,
183            Self::UnknownCapability { messages, .. } => *messages,
184        }
185    }
186}
187
188/// Non-empty,ordered list of recognized shared capabilities.
189///
190/// Shared capabilities are ordered alphabetically by case sensitive name.
191#[derive(Debug, Clone, Deref, DerefMut, PartialEq, Eq)]
192pub struct SharedCapabilities(Vec<SharedCapability>);
193
194impl SharedCapabilities {
195    /// Merges the local and peer capabilities and returns a new [`SharedCapabilities`] instance.
196    #[inline]
197    pub fn try_new(
198        local_protocols: Vec<Protocol>,
199        peer_capabilities: Vec<Capability>,
200    ) -> Result<Self, P2PStreamError> {
201        shared_capability_offsets(local_protocols, peer_capabilities).map(Self)
202    }
203
204    /// Iterates over the shared capabilities.
205    #[inline]
206    pub fn iter_caps(&self) -> impl Iterator<Item = &SharedCapability> {
207        self.0.iter()
208    }
209
210    /// Returns the eth capability if it is shared.
211    #[inline]
212    pub fn eth(&self) -> Result<&SharedCapability, P2PStreamError> {
213        self.iter_caps().find(|c| c.is_eth()).ok_or(P2PStreamError::CapabilityNotShared)
214    }
215
216    /// Returns the negotiated eth version if it is shared.
217    #[inline]
218    pub fn eth_version(&self) -> Result<EthVersion, P2PStreamError> {
219        self.iter_caps()
220            .find_map(SharedCapability::eth_version)
221            .ok_or(P2PStreamError::CapabilityNotShared)
222    }
223
224    /// Returns true if the shared capabilities contain the given capability.
225    #[inline]
226    pub fn contains(&self, cap: &Capability) -> bool {
227        self.find(cap).is_some()
228    }
229
230    /// Returns the shared capability for the given capability.
231    #[inline]
232    pub fn find(&self, cap: &Capability) -> Option<&SharedCapability> {
233        self.0.iter().find(|c| c.version() == cap.version as u8 && c.name() == cap.name)
234    }
235
236    /// Returns the matching shared capability for the given capability offset.
237    ///
238    /// `offset` is the multiplexed message id offset of the capability relative to the reserved
239    /// message id space. In other words, counting starts at [`MAX_RESERVED_MESSAGE_ID`] + 1, which
240    /// corresponds to the first non-reserved message id.
241    ///
242    /// For example: `offset == 0` corresponds to the first shared message across the shared
243    /// capabilities and will return the first shared capability that supports messages.
244    #[inline]
245    pub fn find_by_relative_offset(&self, offset: u8) -> Option<&SharedCapability> {
246        self.find_by_offset(offset.saturating_add(MAX_RESERVED_MESSAGE_ID + 1))
247    }
248
249    /// Returns the matching shared capability for the given capability offset.
250    ///
251    /// `offset` is the multiplexed message id offset of the capability that includes the reserved
252    /// message id space.
253    ///
254    /// This will always return None if `offset` is less than or equal to
255    /// [`MAX_RESERVED_MESSAGE_ID`] because the reserved message id space is not shared.
256    #[inline]
257    pub fn find_by_offset(&self, offset: u8) -> Option<&SharedCapability> {
258        let mut iter = self.0.iter();
259        let mut cap = iter.next()?;
260        if offset < cap.message_id_offset() {
261            // reserved message id space
262            return None
263        }
264
265        for next in iter {
266            if offset < next.message_id_offset() {
267                return Some(cap)
268            }
269            cap = next
270        }
271
272        Some(cap)
273    }
274
275    /// Returns the shared capability for the given capability or an error if it's not compatible.
276    #[inline]
277    pub fn ensure_matching_capability(
278        &self,
279        cap: &Capability,
280    ) -> Result<&SharedCapability, UnsupportedCapabilityError> {
281        self.find(cap).ok_or_else(|| UnsupportedCapabilityError { capability: cap.clone() })
282    }
283
284    /// Returns the number of shared capabilities.
285    #[inline]
286    pub fn len(&self) -> usize {
287        self.0.len()
288    }
289
290    /// Returns true if there are no shared capabilities.
291    #[inline]
292    pub fn is_empty(&self) -> bool {
293        self.0.is_empty()
294    }
295}
296
297/// Determines the offsets for each shared capability between the input list of peer
298/// capabilities and the input list of locally supported [Protocol].
299///
300/// Additionally, the `p2p` capability version 5 is supported, but is
301/// expected _not_ to be in neither `local_protocols` or `peer_capabilities`.
302///
303/// **Note**: For `local_protocols` this takes [Protocol] because we need to know the number of
304/// messages per versioned capability. From the remote we only get the plain [Capability].
305#[inline]
306pub fn shared_capability_offsets(
307    local_protocols: Vec<Protocol>,
308    peer_capabilities: Vec<Capability>,
309) -> Result<Vec<SharedCapability>, P2PStreamError> {
310    // find intersection of capabilities
311    let our_capabilities =
312        local_protocols.into_iter().map(Protocol::split).collect::<HashMap<_, _>>();
313
314    // map of capability name to version
315    let mut shared_capabilities: HashMap<_, ProtoVersion> = HashMap::default();
316
317    // The `Ord` implementation for capability names should be equivalent to geth (and every other
318    // client), since geth uses golang's default string comparison, which orders strings
319    // lexicographically.
320    // https://golang.org/pkg/strings/#Compare
321    //
322    // This is important because the capability name is used to determine the message id offset, so
323    // if the sorting is not identical, offsets for connected peers could be inconsistent.
324    // This would cause the peers to send messages with the wrong message id, which is usually a
325    // protocol violation.
326    //
327    // The `Ord` implementation for `str` orders strings lexicographically.
328    let mut shared_capability_names = BTreeSet::new();
329
330    // find highest shared version of each shared capability
331    for peer_capability in peer_capabilities {
332        // if we contain this specific capability both peers share it
333        if let Some(messages) = our_capabilities.get(&peer_capability).copied() {
334            // If multiple versions are shared of the same (equal name) capability, the numerically
335            // highest wins, others are ignored
336            if shared_capabilities
337                .get(&peer_capability.name)
338                .is_none_or(|v| peer_capability.version > v.version)
339            {
340                shared_capabilities.insert(
341                    peer_capability.name.clone(),
342                    ProtoVersion { version: peer_capability.version, messages },
343                );
344                shared_capability_names.insert(peer_capability.name);
345            }
346        }
347    }
348
349    // disconnect if we don't share any capabilities
350    if shared_capabilities.is_empty() {
351        return Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))
352    }
353
354    // order versions based on capability name (alphabetical) and select offsets based on
355    // BASE_OFFSET + prev_total_message
356    let mut shared_with_offsets = Vec::new();
357
358    // Message IDs are assumed to be compact from ID 0x10 onwards (0x00-0x0f is reserved for the
359    // "p2p" capability) and given to each shared (equal-version, equal-name) capability in
360    // alphabetic order.
361    let mut offset = MAX_RESERVED_MESSAGE_ID + 1;
362    for name in shared_capability_names {
363        let proto_version = &shared_capabilities[&name];
364        let shared_capability = SharedCapability::new(
365            &name,
366            proto_version.version as u8,
367            offset,
368            proto_version.messages,
369        )?;
370        offset += shared_capability.num_messages();
371        shared_with_offsets.push(shared_capability);
372    }
373
374    if shared_with_offsets.is_empty() {
375        return Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))
376    }
377
378    Ok(shared_with_offsets)
379}
380
381/// An error that may occur while creating a [`SharedCapability`].
382#[derive(Debug, thiserror::Error)]
383pub enum SharedCapabilityError {
384    /// Unsupported `eth` version.
385    #[error(transparent)]
386    UnsupportedVersion(#[from] ParseVersionError),
387    /// Thrown when the message id for a [`SharedCapability`] overlaps with the reserved p2p
388    /// message id space [`MAX_RESERVED_MESSAGE_ID`].
389    #[error("message id offset `{0}` is reserved")]
390    ReservedMessageIdOffset(u8),
391}
392
393/// An error thrown when capabilities mismatch.
394#[derive(Debug, thiserror::Error)]
395#[error("unsupported capability {capability}")]
396pub struct UnsupportedCapabilityError {
397    capability: Capability,
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::{Capabilities, Capability};
404
405    #[test]
406    fn from_eth_68() {
407        let capability = SharedCapability::new("eth", 68, MAX_RESERVED_MESSAGE_ID + 1, 13).unwrap();
408
409        assert_eq!(capability.name(), "eth");
410        assert_eq!(capability.version(), 68);
411        assert_eq!(
412            capability,
413            SharedCapability::Eth {
414                version: EthVersion::Eth68,
415                offset: MAX_RESERVED_MESSAGE_ID + 1
416            }
417        );
418    }
419
420    #[test]
421    fn from_eth_67() {
422        let capability = SharedCapability::new("eth", 67, MAX_RESERVED_MESSAGE_ID + 1, 13).unwrap();
423
424        assert_eq!(capability.name(), "eth");
425        assert_eq!(capability.version(), 67);
426        assert_eq!(
427            capability,
428            SharedCapability::Eth {
429                version: EthVersion::Eth67,
430                offset: MAX_RESERVED_MESSAGE_ID + 1
431            }
432        );
433    }
434
435    #[test]
436    fn from_eth_66() {
437        let capability = SharedCapability::new("eth", 66, MAX_RESERVED_MESSAGE_ID + 1, 15).unwrap();
438
439        assert_eq!(capability.name(), "eth");
440        assert_eq!(capability.version(), 66);
441        assert_eq!(
442            capability,
443            SharedCapability::Eth {
444                version: EthVersion::Eth66,
445                offset: MAX_RESERVED_MESSAGE_ID + 1
446            }
447        );
448    }
449
450    #[test]
451    fn capabilities_supports_eth() {
452        let capabilities: Capabilities = vec![
453            Capability::new_static("eth", 66),
454            Capability::new_static("eth", 67),
455            Capability::new_static("eth", 68),
456        ]
457        .into();
458
459        assert!(capabilities.supports_eth());
460        assert!(capabilities.supports_eth_v66());
461        assert!(capabilities.supports_eth_v67());
462        assert!(capabilities.supports_eth_v68());
463    }
464
465    #[test]
466    fn test_peer_capability_version_zero() {
467        let cap = Capability::new_static("TestName", 0);
468        let local_capabilities: Vec<Protocol> =
469            vec![Protocol::new(cap.clone(), 0), EthVersion::Eth67.into(), EthVersion::Eth68.into()];
470        let peer_capabilities = vec![cap.clone()];
471
472        let shared = shared_capability_offsets(local_capabilities, peer_capabilities).unwrap();
473        assert_eq!(shared.len(), 1);
474        assert_eq!(shared[0], SharedCapability::UnknownCapability { cap, offset: 16, messages: 0 })
475    }
476
477    #[test]
478    fn test_peer_lower_capability_version() {
479        let local_capabilities: Vec<Protocol> =
480            vec![EthVersion::Eth66.into(), EthVersion::Eth67.into(), EthVersion::Eth68.into()];
481        let peer_capabilities: Vec<Capability> = vec![EthVersion::Eth66.into()];
482
483        let shared_capability =
484            shared_capability_offsets(local_capabilities, peer_capabilities).unwrap()[0].clone();
485
486        assert_eq!(
487            shared_capability,
488            SharedCapability::Eth {
489                version: EthVersion::Eth66,
490                offset: MAX_RESERVED_MESSAGE_ID + 1
491            }
492        )
493    }
494
495    #[test]
496    fn test_peer_capability_version_too_low() {
497        let local: Vec<Protocol> = vec![EthVersion::Eth67.into()];
498        let peer_capabilities: Vec<Capability> = vec![EthVersion::Eth66.into()];
499
500        let shared_capability = shared_capability_offsets(local, peer_capabilities);
501
502        assert!(matches!(
503            shared_capability,
504            Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))
505        ))
506    }
507
508    #[test]
509    fn test_peer_capability_version_too_high() {
510        let local_capabilities = vec![EthVersion::Eth66.into()];
511        let peer_capabilities = vec![EthVersion::Eth67.into()];
512
513        let shared_capability = shared_capability_offsets(local_capabilities, peer_capabilities);
514
515        assert!(matches!(
516            shared_capability,
517            Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))
518        ))
519    }
520
521    #[test]
522    fn test_find_by_offset() {
523        let local_capabilities = vec![EthVersion::Eth66.into()];
524        let peer_capabilities = vec![EthVersion::Eth66.into()];
525
526        let shared = SharedCapabilities::try_new(local_capabilities, peer_capabilities).unwrap();
527
528        let shared_eth = shared.find_by_relative_offset(0).unwrap();
529        assert_eq!(shared_eth.name(), "eth");
530
531        let shared_eth = shared.find_by_offset(MAX_RESERVED_MESSAGE_ID + 1).unwrap();
532        assert_eq!(shared_eth.name(), "eth");
533
534        // reserved message id space
535        assert!(shared.find_by_offset(MAX_RESERVED_MESSAGE_ID).is_none());
536    }
537
538    #[test]
539    fn test_find_by_offset_many() {
540        let cap = Capability::new_static("aaa", 1);
541        let proto = Protocol::new(cap.clone(), 5);
542        let local_capabilities = vec![proto.clone(), EthVersion::Eth66.into()];
543        let peer_capabilities = vec![cap, EthVersion::Eth66.into()];
544
545        let shared = SharedCapabilities::try_new(local_capabilities, peer_capabilities).unwrap();
546
547        let shared_eth = shared.find_by_relative_offset(0).unwrap();
548        assert_eq!(shared_eth.name(), proto.cap.name);
549
550        let shared_eth = shared.find_by_offset(MAX_RESERVED_MESSAGE_ID + 1).unwrap();
551        assert_eq!(shared_eth.name(), proto.cap.name);
552
553        // the 5th shared message (0,1,2,3,4) is the last message of the aaa capability
554        let shared_eth = shared.find_by_relative_offset(4).unwrap();
555        assert_eq!(shared_eth.name(), proto.cap.name);
556        let shared_eth = shared.find_by_offset(MAX_RESERVED_MESSAGE_ID + 5).unwrap();
557        assert_eq!(shared_eth.name(), proto.cap.name);
558
559        // the 6th shared message is the first message of the eth capability
560        let shared_eth = shared.find_by_relative_offset(1 + proto.messages()).unwrap();
561        assert_eq!(shared_eth.name(), "eth");
562    }
563}