reth_ipc/server/
rpc_service.rs

1//! JSON-RPC service middleware.
2use futures_util::future::BoxFuture;
3use jsonrpsee::{
4    server::{
5        middleware::rpc::{ResponseFuture, RpcServiceT},
6        IdProvider,
7    },
8    types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
9    BoundedSubscriptions, ConnectionId, MethodCallback, MethodResponse, MethodSink, Methods,
10    SubscriptionState,
11};
12use std::sync::Arc;
13
14/// JSON-RPC service middleware.
15#[derive(Clone, Debug)]
16pub struct RpcService {
17    conn_id: ConnectionId,
18    methods: Methods,
19    max_response_body_size: usize,
20    cfg: RpcServiceCfg,
21}
22
23/// Configuration of the `RpcService`.
24#[allow(dead_code)]
25#[derive(Clone, Debug)]
26pub(crate) enum RpcServiceCfg {
27    /// The server supports only calls.
28    OnlyCalls,
29    /// The server supports both method calls and subscriptions.
30    CallsAndSubscriptions {
31        bounded_subscriptions: BoundedSubscriptions,
32        sink: MethodSink,
33        id_provider: Arc<dyn IdProvider>,
34    },
35}
36
37impl RpcService {
38    /// Create a new service.
39    pub(crate) const fn new(
40        methods: Methods,
41        max_response_body_size: usize,
42        conn_id: ConnectionId,
43        cfg: RpcServiceCfg,
44    ) -> Self {
45        Self { methods, max_response_body_size, conn_id, cfg }
46    }
47}
48
49impl<'a> RpcServiceT<'a> for RpcService {
50    // The rpc module is already boxing the futures and
51    // it's used to under the hood by the RpcService.
52    type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
53
54    fn call(&self, req: Request<'a>) -> Self::Future {
55        let conn_id = self.conn_id;
56        let max_response_body_size = self.max_response_body_size;
57
58        let params = req.params();
59        let name = req.method_name();
60        let id = req.id().clone();
61        let extensions = req.extensions.clone();
62
63        match self.methods.method_with_name(name) {
64            None => {
65                let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
66                ResponseFuture::ready(rp)
67            }
68            Some((_name, method)) => match method {
69                MethodCallback::Sync(callback) => {
70                    let rp = (callback)(id, params, max_response_body_size, extensions);
71                    ResponseFuture::ready(rp)
72                }
73                MethodCallback::Async(callback) => {
74                    let params = params.into_owned();
75                    let id = id.into_owned();
76
77                    let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
78                    ResponseFuture::future(fut)
79                }
80                MethodCallback::Subscription(callback) => {
81                    let RpcServiceCfg::CallsAndSubscriptions {
82                        bounded_subscriptions,
83                        sink,
84                        id_provider,
85                    } = self.cfg.clone()
86                    else {
87                        tracing::warn!("Subscriptions not supported");
88                        let rp =
89                            MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
90                        return ResponseFuture::ready(rp);
91                    };
92
93                    if let Some(p) = bounded_subscriptions.acquire() {
94                        let conn_state = SubscriptionState {
95                            conn_id,
96                            id_provider: &*id_provider.clone(),
97                            subscription_permit: p,
98                        };
99
100                        let fut = callback(id.clone(), params, sink, conn_state, extensions);
101                        ResponseFuture::future(fut)
102                    } else {
103                        let max = bounded_subscriptions.max();
104                        let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
105                        ResponseFuture::ready(rp)
106                    }
107                }
108                MethodCallback::Unsubscription(callback) => {
109                    // Don't adhere to any resource or subscription limits; always let unsubscribing
110                    // happen!
111
112                    let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
113                        tracing::warn!("Subscriptions not supported");
114                        let rp =
115                            MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
116                        return ResponseFuture::ready(rp);
117                    };
118
119                    let rp = callback(id, params, conn_id, max_response_body_size, extensions);
120                    ResponseFuture::ready(rp)
121                }
122            },
123        }
124    }
125}