reth_rpc_builder/
metrics.rs

1use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse, RpcModule};
2use reth_metrics::{
3    metrics::{Counter, Histogram},
4    Metrics,
5};
6use std::{
7    collections::HashMap,
8    future::Future,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12    time::Instant,
13};
14use tower::Layer;
15
16/// Metrics for the RPC server.
17///
18/// Metrics are divided into two categories:
19/// - Connection metrics: metrics for the connection (e.g. number of connections opened, relevant
20///   for WS and IPC)
21/// - Request metrics: metrics for each RPC method (e.g. number of calls started, time taken to
22///   process a call)
23#[derive(Default, Debug, Clone)]
24pub(crate) struct RpcRequestMetrics {
25    inner: Arc<RpcServerMetricsInner>,
26}
27
28impl RpcRequestMetrics {
29    pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
30        Self {
31            inner: Arc::new(RpcServerMetricsInner {
32                connection_metrics: transport.connection_metrics(),
33                call_metrics: module
34                    .method_names()
35                    .map(|method| {
36                        (method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
37                    })
38                    .collect(),
39            }),
40        }
41    }
42
43    /// Creates a new instance of the metrics layer for HTTP.
44    pub(crate) fn http(module: &RpcModule<()>) -> Self {
45        Self::new(module, RpcTransport::Http)
46    }
47
48    /// Creates a new instance of the metrics layer for same port.
49    ///
50    /// Note: currently it's not possible to track transport specific metrics for a server that runs http and ws on the same port: <https://github.com/paritytech/jsonrpsee/issues/1345> until we have this feature we will use the http metrics for this case.
51    pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
52        Self::http(module)
53    }
54
55    /// Creates a new instance of the metrics layer for Ws.
56    pub(crate) fn ws(module: &RpcModule<()>) -> Self {
57        Self::new(module, RpcTransport::WebSocket)
58    }
59
60    /// Creates a new instance of the metrics layer for Ws.
61    #[allow(unused)]
62    pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
63        Self::new(module, RpcTransport::Ipc)
64    }
65}
66
67impl<S> Layer<S> for RpcRequestMetrics {
68    type Service = RpcRequestMetricsService<S>;
69
70    fn layer(&self, inner: S) -> Self::Service {
71        RpcRequestMetricsService::new(inner, self.clone())
72    }
73}
74
75/// Metrics for the RPC server
76#[derive(Default, Clone, Debug)]
77struct RpcServerMetricsInner {
78    /// Connection metrics per transport type
79    connection_metrics: RpcServerConnectionMetrics,
80    /// Call metrics per RPC method
81    call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
82}
83
84/// A [`RpcServiceT`] middleware that captures RPC metrics for the server.
85///
86/// This is created per connection and captures metrics for each request.
87#[derive(Clone, Debug)]
88pub struct RpcRequestMetricsService<S> {
89    /// The metrics collector for RPC requests
90    metrics: RpcRequestMetrics,
91    /// The inner service being wrapped
92    inner: S,
93}
94
95impl<S> RpcRequestMetricsService<S> {
96    pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
97        // this instance is kept alive for the duration of the connection
98        metrics.inner.connection_metrics.connections_opened_total.increment(1);
99        Self { inner: service, metrics }
100    }
101}
102
103impl<'a, S> RpcServiceT<'a> for RpcRequestMetricsService<S>
104where
105    S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
106{
107    type Future = MeteredRequestFuture<S::Future>;
108
109    fn call(&self, req: Request<'a>) -> Self::Future {
110        self.metrics.inner.connection_metrics.requests_started_total.increment(1);
111        let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
112        if let Some((_, call_metrics)) = &call_metrics {
113            call_metrics.started_total.increment(1);
114        }
115        MeteredRequestFuture {
116            fut: self.inner.call(req),
117            started_at: Instant::now(),
118            metrics: self.metrics.clone(),
119            method: call_metrics.map(|(method, _)| *method),
120        }
121    }
122}
123
124impl<S> Drop for RpcRequestMetricsService<S> {
125    fn drop(&mut self) {
126        // update connection metrics, connection closed
127        self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
128    }
129}
130
131/// Response future to update the metrics for a single request/response pair.
132#[pin_project::pin_project]
133pub struct MeteredRequestFuture<F> {
134    #[pin]
135    fut: F,
136    /// time when the request started
137    started_at: Instant,
138    /// metrics for the method call
139    metrics: RpcRequestMetrics,
140    /// the method name if known
141    method: Option<&'static str>,
142}
143
144impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.write_str("MeteredRequestFuture")
147    }
148}
149
150impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
151    type Output = F::Output;
152
153    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154        let this = self.project();
155
156        let res = this.fut.poll(cx);
157        if let Poll::Ready(resp) = &res {
158            let elapsed = this.started_at.elapsed().as_secs_f64();
159
160            // update transport metrics
161            this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
162            this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
163
164            // update call metrics
165            if let Some(call_metrics) =
166                this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
167            {
168                call_metrics.time_seconds.record(elapsed);
169                if resp.is_success() {
170                    call_metrics.successful_total.increment(1);
171                } else {
172                    call_metrics.failed_total.increment(1);
173                }
174            }
175        }
176        res
177    }
178}
179
180/// The transport protocol used for the RPC connection.
181#[derive(Debug, Clone, Copy, Eq, PartialEq)]
182pub(crate) enum RpcTransport {
183    Http,
184    WebSocket,
185    #[allow(unused)]
186    Ipc,
187}
188
189impl RpcTransport {
190    /// Returns the string representation of the transport protocol.
191    pub(crate) const fn as_str(&self) -> &'static str {
192        match self {
193            Self::Http => "http",
194            Self::WebSocket => "ws",
195            Self::Ipc => "ipc",
196        }
197    }
198
199    /// Returns the connection metrics for the transport protocol.
200    fn connection_metrics(&self) -> RpcServerConnectionMetrics {
201        RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
202    }
203}
204
205/// Metrics for the RPC connections
206#[derive(Metrics, Clone)]
207#[metrics(scope = "rpc_server.connections")]
208struct RpcServerConnectionMetrics {
209    /// The number of connections opened
210    connections_opened_total: Counter,
211    /// The number of connections closed
212    connections_closed_total: Counter,
213    /// The number of requests started
214    requests_started_total: Counter,
215    /// The number of requests finished
216    requests_finished_total: Counter,
217    /// Response for a single request/response pair
218    request_time_seconds: Histogram,
219}
220
221/// Metrics for the RPC calls
222#[derive(Metrics, Clone)]
223#[metrics(scope = "rpc_server.calls")]
224struct RpcServerCallMetrics {
225    /// The number of calls started
226    started_total: Counter,
227    /// The number of successful calls
228    successful_total: Counter,
229    /// The number of failed calls
230    failed_total: Counter,
231    /// Response for a single call
232    time_seconds: Histogram,
233}