reth_rpc_builder/
metrics.rs

1use jsonrpsee::{
2    core::middleware::{Batch, Notification},
3    server::middleware::rpc::RpcServiceT,
4    types::Request,
5    MethodResponse, RpcModule,
6};
7use reth_metrics::{
8    metrics::{Counter, Histogram},
9    Metrics,
10};
11use std::{
12    collections::HashMap,
13    future::Future,
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17    time::Instant,
18};
19use tower::Layer;
20
21/// Metrics for the RPC server.
22///
23/// Metrics are divided into two categories:
24/// - Connection metrics: metrics for the connection (e.g. number of connections opened, relevant
25///   for WS and IPC)
26/// - Request metrics: metrics for each RPC method (e.g. number of calls started, time taken to
27///   process a call)
28#[derive(Default, Debug, Clone)]
29pub(crate) struct RpcRequestMetrics {
30    inner: Arc<RpcServerMetricsInner>,
31}
32
33impl RpcRequestMetrics {
34    pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
35        Self {
36            inner: Arc::new(RpcServerMetricsInner {
37                connection_metrics: transport.connection_metrics(),
38                call_metrics: module
39                    .method_names()
40                    .map(|method| {
41                        (method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
42                    })
43                    .collect(),
44            }),
45        }
46    }
47
48    /// Creates a new instance of the metrics layer for HTTP.
49    pub(crate) fn http(module: &RpcModule<()>) -> Self {
50        Self::new(module, RpcTransport::Http)
51    }
52
53    /// Creates a new instance of the metrics layer for same port.
54    ///
55    /// Note: currently it's not possible to track transport specific metrics for a server that runs http and ws on the same port: <https://github.com/paritytech/jsonrpsee/issues/1345> until we have this feature we will use the http metrics for this case.
56    pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
57        Self::http(module)
58    }
59
60    /// Creates a new instance of the metrics layer for Ws.
61    pub(crate) fn ws(module: &RpcModule<()>) -> Self {
62        Self::new(module, RpcTransport::WebSocket)
63    }
64
65    /// Creates a new instance of the metrics layer for Ws.
66    pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
67        Self::new(module, RpcTransport::Ipc)
68    }
69}
70
71impl<S> Layer<S> for RpcRequestMetrics {
72    type Service = RpcRequestMetricsService<S>;
73
74    fn layer(&self, inner: S) -> Self::Service {
75        RpcRequestMetricsService::new(inner, self.clone())
76    }
77}
78
79/// Metrics for the RPC server
80#[derive(Default, Clone, Debug)]
81struct RpcServerMetricsInner {
82    /// Connection metrics per transport type
83    connection_metrics: RpcServerConnectionMetrics,
84    /// Call metrics per RPC method
85    call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
86}
87
88/// A [`RpcServiceT`] middleware that captures RPC metrics for the server.
89///
90/// This is created per connection and captures metrics for each request.
91#[derive(Clone, Debug)]
92pub struct RpcRequestMetricsService<S> {
93    /// The metrics collector for RPC requests
94    metrics: RpcRequestMetrics,
95    /// The inner service being wrapped
96    inner: S,
97}
98
99impl<S> RpcRequestMetricsService<S> {
100    pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
101        // this instance is kept alive for the duration of the connection
102        metrics.inner.connection_metrics.connections_opened_total.increment(1);
103        Self { inner: service, metrics }
104    }
105}
106
107impl<S> RpcServiceT for RpcRequestMetricsService<S>
108where
109    S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
110{
111    type MethodResponse = S::MethodResponse;
112    type NotificationResponse = S::NotificationResponse;
113    type BatchResponse = S::BatchResponse;
114
115    fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = S::MethodResponse> + Send + 'a {
116        self.metrics.inner.connection_metrics.requests_started_total.increment(1);
117        let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
118        if let Some((_, call_metrics)) = &call_metrics {
119            call_metrics.started_total.increment(1);
120        }
121        MeteredRequestFuture {
122            fut: self.inner.call(req),
123            started_at: Instant::now(),
124            metrics: self.metrics.clone(),
125            method: call_metrics.map(|(method, _)| *method),
126        }
127    }
128
129    fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
130        self.inner.batch(req)
131    }
132
133    fn notification<'a>(
134        &self,
135        n: Notification<'a>,
136    ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
137        self.inner.notification(n)
138    }
139}
140
141impl<S> Drop for RpcRequestMetricsService<S> {
142    fn drop(&mut self) {
143        // update connection metrics, connection closed
144        self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
145    }
146}
147
148/// Response future to update the metrics for a single request/response pair.
149#[pin_project::pin_project]
150pub struct MeteredRequestFuture<F> {
151    #[pin]
152    fut: F,
153    /// time when the request started
154    started_at: Instant,
155    /// metrics for the method call
156    metrics: RpcRequestMetrics,
157    /// the method name if known
158    method: Option<&'static str>,
159}
160
161impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.write_str("MeteredRequestFuture")
164    }
165}
166
167impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
168    type Output = F::Output;
169
170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        let this = self.project();
172
173        let res = this.fut.poll(cx);
174        if let Poll::Ready(resp) = &res {
175            let elapsed = this.started_at.elapsed().as_secs_f64();
176
177            // update transport metrics
178            this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
179            this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
180
181            // update call metrics
182            if let Some(call_metrics) =
183                this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
184            {
185                call_metrics.time_seconds.record(elapsed);
186                if resp.is_success() {
187                    call_metrics.successful_total.increment(1);
188                } else {
189                    call_metrics.failed_total.increment(1);
190                }
191            }
192        }
193        res
194    }
195}
196
197/// The transport protocol used for the RPC connection.
198#[derive(Debug, Clone, Copy, Eq, PartialEq)]
199pub(crate) enum RpcTransport {
200    Http,
201    WebSocket,
202    Ipc,
203}
204
205impl RpcTransport {
206    /// Returns the string representation of the transport protocol.
207    pub(crate) const fn as_str(&self) -> &'static str {
208        match self {
209            Self::Http => "http",
210            Self::WebSocket => "ws",
211            Self::Ipc => "ipc",
212        }
213    }
214
215    /// Returns the connection metrics for the transport protocol.
216    fn connection_metrics(&self) -> RpcServerConnectionMetrics {
217        RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
218    }
219}
220
221/// Metrics for the RPC connections
222#[derive(Metrics, Clone)]
223#[metrics(scope = "rpc_server.connections")]
224struct RpcServerConnectionMetrics {
225    /// The number of connections opened
226    connections_opened_total: Counter,
227    /// The number of connections closed
228    connections_closed_total: Counter,
229    /// The number of requests started
230    requests_started_total: Counter,
231    /// The number of requests finished
232    requests_finished_total: Counter,
233    /// Response for a single request/response pair
234    request_time_seconds: Histogram,
235}
236
237/// Metrics for the RPC calls
238#[derive(Metrics, Clone)]
239#[metrics(scope = "rpc_server.calls")]
240struct RpcServerCallMetrics {
241    /// The number of calls started
242    started_total: Counter,
243    /// The number of successful calls
244    successful_total: Counter,
245    /// The number of failed calls
246    failed_total: Counter,
247    /// Response for a single call
248    time_seconds: Histogram,
249}