reth_ipc/server/
rpc_service.rs

1//! JSON-RPC service middleware.
2use futures::{
3    future::Either,
4    stream::{FuturesOrdered, StreamExt},
5};
6use jsonrpsee::{
7    core::middleware::{Batch, BatchEntry},
8    server::{
9        middleware::rpc::{ResponseFuture, RpcServiceT},
10        IdProvider,
11    },
12    types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Id, Request},
13    BatchResponse, BatchResponseBuilder, BoundedSubscriptions, ConnectionId, MethodCallback,
14    MethodResponse, MethodSink, Methods, SubscriptionState,
15};
16use std::{future::Future, sync::Arc};
17
18/// JSON-RPC service middleware.
19#[derive(Clone, Debug)]
20pub struct RpcService {
21    conn_id: ConnectionId,
22    methods: Methods,
23    max_response_body_size: usize,
24    cfg: RpcServiceCfg,
25}
26
27/// Configuration of the `RpcService`.
28#[allow(dead_code)]
29#[derive(Clone, Debug)]
30pub(crate) enum RpcServiceCfg {
31    /// The server supports only calls.
32    OnlyCalls,
33    /// The server supports both method calls and subscriptions.
34    CallsAndSubscriptions {
35        bounded_subscriptions: BoundedSubscriptions,
36        sink: MethodSink,
37        id_provider: Arc<dyn IdProvider>,
38    },
39}
40
41impl RpcService {
42    /// Create a new service.
43    pub(crate) const fn new(
44        methods: Methods,
45        max_response_body_size: usize,
46        conn_id: ConnectionId,
47        cfg: RpcServiceCfg,
48    ) -> Self {
49        Self { methods, max_response_body_size, conn_id, cfg }
50    }
51}
52
53impl RpcServiceT for RpcService {
54    type MethodResponse = MethodResponse;
55    type NotificationResponse = Option<MethodResponse>;
56    type BatchResponse = BatchResponse;
57
58    fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
59        let conn_id = self.conn_id;
60        let max_response_body_size = self.max_response_body_size;
61
62        let params = req.params();
63        let name = req.method_name();
64        let id = req.id().clone();
65        let extensions = req.extensions.clone();
66
67        match self.methods.method_with_name(name) {
68            None => {
69                let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
70                ResponseFuture::ready(rp)
71            }
72            Some((_name, method)) => match method {
73                MethodCallback::Sync(callback) => {
74                    let rp = (callback)(id, params, max_response_body_size, extensions);
75                    ResponseFuture::ready(rp)
76                }
77                MethodCallback::Async(callback) => {
78                    let params = params.into_owned();
79                    let id = id.into_owned();
80
81                    let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
82                    ResponseFuture::future(fut)
83                }
84                MethodCallback::Subscription(callback) => {
85                    let RpcServiceCfg::CallsAndSubscriptions {
86                        bounded_subscriptions,
87                        sink,
88                        id_provider,
89                    } = &self.cfg
90                    else {
91                        tracing::warn!(id = ?id, method = %name, "Attempted subscription on a service not configured for subscriptions.");
92                        let rp =
93                            MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
94                        return ResponseFuture::ready(rp);
95                    };
96
97                    if let Some(p) = bounded_subscriptions.acquire() {
98                        let conn_state = SubscriptionState {
99                            conn_id,
100                            id_provider: &**id_provider,
101                            subscription_permit: p,
102                        };
103
104                        let fut =
105                            callback(id.clone(), params, sink.clone(), conn_state, extensions);
106                        ResponseFuture::future(fut)
107                    } else {
108                        let max = bounded_subscriptions.max();
109                        let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
110                        ResponseFuture::ready(rp)
111                    }
112                }
113                MethodCallback::Unsubscription(callback) => {
114                    // Don't adhere to any resource or subscription limits; always let unsubscribing
115                    // happen!
116
117                    let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
118                        tracing::warn!(id = ?id, method = %name, "Attempted unsubscription on a service not configured for subscriptions.");
119                        let rp =
120                            MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
121                        return ResponseFuture::ready(rp);
122                    };
123
124                    let rp = callback(id, params, conn_id, max_response_body_size, extensions);
125                    ResponseFuture::ready(rp)
126                }
127            },
128        }
129    }
130
131    fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
132        let entries: Vec<_> = req.into_iter().collect();
133
134        let mut got_notif = false;
135        let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
136
137        let mut pending_calls: FuturesOrdered<_> = entries
138            .into_iter()
139            .filter_map(|v| match v {
140                Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
141                Ok(BatchEntry::Notification(_n)) => {
142                    got_notif = true;
143                    None
144                }
145                Err(_err) => Some(Either::Left(async {
146                    MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
147                })),
148            })
149            .collect();
150        async move {
151            while let Some(response) = pending_calls.next().await {
152                if let Err(too_large) = batch_response.append(response) {
153                    let mut error_batch = BatchResponseBuilder::new_with_limit(1);
154                    let _ = error_batch.append(too_large);
155                    return error_batch.finish();
156                }
157            }
158
159            batch_response.finish()
160        }
161    }
162
163    #[allow(clippy::manual_async_fn)]
164    fn notification<'a>(
165        &self,
166        _n: jsonrpsee::core::middleware::Notification<'a>,
167    ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
168        async move { None }
169    }
170}