reth_network_p2p/test_utils/
bodies.rs

1use crate::{
2    bodies::client::{BodiesClient, BodiesFut},
3    download::DownloadClient,
4    error::PeerRequestResult,
5    priority::Priority,
6};
7use alloy_primitives::B256;
8use futures::FutureExt;
9use reth_primitives::BlockBody;
10use std::fmt::{Debug, Formatter};
11use tokio::sync::oneshot;
12
13/// A test client for fetching bodies
14pub struct TestBodiesClient<F> {
15    /// The function that is called on each body request.
16    pub responder: F,
17}
18
19impl<F> Debug for TestBodiesClient<F> {
20    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("TestBodiesClient").finish_non_exhaustive()
22    }
23}
24
25impl<F: Sync + Send> DownloadClient for TestBodiesClient<F> {
26    fn report_bad_message(&self, _peer_id: reth_network_peers::PeerId) {
27        // noop
28    }
29
30    fn num_connected_peers(&self) -> usize {
31        0
32    }
33}
34
35impl<F> BodiesClient for TestBodiesClient<F>
36where
37    F: Fn(Vec<B256>) -> PeerRequestResult<Vec<BlockBody>> + Send + Sync,
38{
39    type Body = BlockBody;
40    type Output = BodiesFut;
41
42    fn get_block_bodies_with_priority(
43        &self,
44        hashes: Vec<B256>,
45        _priority: Priority,
46    ) -> Self::Output {
47        let (tx, rx) = oneshot::channel();
48        let _ = tx.send((self.responder)(hashes));
49        Box::pin(rx.map(|x| match x {
50            Ok(value) => value,
51            Err(err) => Err(err.into()),
52        }))
53    }
54}