reth_ipc/server/
rpc_service.rs1use futures::{
3 future::Either,
4 stream::{FuturesOrdered, StreamExt},
5};
6use jsonrpsee::{
7 core::middleware::{Batch, BatchEntry},
8 server::{
9 middleware::rpc::{ResponseFuture, RpcServiceT},
10 IdProvider,
11 },
12 types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Id, Request},
13 BatchResponse, BatchResponseBuilder, BoundedSubscriptions, ConnectionId, MethodCallback,
14 MethodResponse, MethodSink, Methods, SubscriptionState,
15};
16use std::{future::Future, sync::Arc};
17
18#[derive(Clone, Debug)]
20pub struct RpcService {
21 conn_id: ConnectionId,
22 methods: Methods,
23 max_response_body_size: usize,
24 cfg: RpcServiceCfg,
25}
26
27#[allow(dead_code)]
29#[derive(Clone, Debug)]
30pub(crate) enum RpcServiceCfg {
31 OnlyCalls,
33 CallsAndSubscriptions {
35 bounded_subscriptions: BoundedSubscriptions,
36 sink: MethodSink,
37 id_provider: Arc<dyn IdProvider>,
38 },
39}
40
41impl RpcService {
42 pub(crate) const fn new(
44 methods: Methods,
45 max_response_body_size: usize,
46 conn_id: ConnectionId,
47 cfg: RpcServiceCfg,
48 ) -> Self {
49 Self { methods, max_response_body_size, conn_id, cfg }
50 }
51}
52
53impl RpcServiceT for RpcService {
54 type MethodResponse = MethodResponse;
55 type NotificationResponse = Option<MethodResponse>;
56 type BatchResponse = BatchResponse;
57
58 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
59 let conn_id = self.conn_id;
60 let max_response_body_size = self.max_response_body_size;
61
62 let params = req.params();
63 let name = req.method_name();
64 let id = req.id().clone();
65 let extensions = req.extensions.clone();
66
67 match self.methods.method_with_name(name) {
68 None => {
69 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
70 ResponseFuture::ready(rp)
71 }
72 Some((_name, method)) => match method {
73 MethodCallback::Sync(callback) => {
74 let rp = (callback)(id, params, max_response_body_size, extensions);
75 ResponseFuture::ready(rp)
76 }
77 MethodCallback::Async(callback) => {
78 let params = params.into_owned();
79 let id = id.into_owned();
80
81 let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
82 ResponseFuture::future(fut)
83 }
84 MethodCallback::Subscription(callback) => {
85 let RpcServiceCfg::CallsAndSubscriptions {
86 bounded_subscriptions,
87 sink,
88 id_provider,
89 } = &self.cfg
90 else {
91 tracing::warn!(id = ?id, method = %name, "Attempted subscription on a service not configured for subscriptions.");
92 let rp =
93 MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
94 return ResponseFuture::ready(rp);
95 };
96
97 if let Some(p) = bounded_subscriptions.acquire() {
98 let conn_state = SubscriptionState {
99 conn_id,
100 id_provider: &**id_provider,
101 subscription_permit: p,
102 };
103
104 let fut =
105 callback(id.clone(), params, sink.clone(), conn_state, extensions);
106 ResponseFuture::future(fut)
107 } else {
108 let max = bounded_subscriptions.max();
109 let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
110 ResponseFuture::ready(rp)
111 }
112 }
113 MethodCallback::Unsubscription(callback) => {
114 let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
118 tracing::warn!(id = ?id, method = %name, "Attempted unsubscription on a service not configured for subscriptions.");
119 let rp =
120 MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
121 return ResponseFuture::ready(rp);
122 };
123
124 let rp = callback(id, params, conn_id, max_response_body_size, extensions);
125 ResponseFuture::ready(rp)
126 }
127 },
128 }
129 }
130
131 fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
132 let entries: Vec<_> = req.into_iter().collect();
133
134 let mut got_notif = false;
135 let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
136
137 let mut pending_calls: FuturesOrdered<_> = entries
138 .into_iter()
139 .filter_map(|v| match v {
140 Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
141 Ok(BatchEntry::Notification(_n)) => {
142 got_notif = true;
143 None
144 }
145 Err(_err) => Some(Either::Left(async {
146 MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
147 })),
148 })
149 .collect();
150 async move {
151 while let Some(response) = pending_calls.next().await {
152 if let Err(too_large) = batch_response.append(response) {
153 let mut error_batch = BatchResponseBuilder::new_with_limit(1);
154 let _ = error_batch.append(too_large);
155 return error_batch.finish();
156 }
157 }
158
159 batch_response.finish()
160 }
161 }
162
163 #[allow(clippy::manual_async_fn)]
164 fn notification<'a>(
165 &self,
166 _n: jsonrpsee::core::middleware::Notification<'a>,
167 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
168 async move { None }
169 }
170}