reth_rpc_builder/
metrics.rs1use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse, RpcModule};
2use reth_metrics::{
3 metrics::{Counter, Histogram},
4 Metrics,
5};
6use std::{
7 collections::HashMap,
8 future::Future,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12 time::Instant,
13};
14use tower::Layer;
15
16#[derive(Default, Debug, Clone)]
24pub(crate) struct RpcRequestMetrics {
25 inner: Arc<RpcServerMetricsInner>,
26}
27
28impl RpcRequestMetrics {
29 pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
30 Self {
31 inner: Arc::new(RpcServerMetricsInner {
32 connection_metrics: transport.connection_metrics(),
33 call_metrics: module
34 .method_names()
35 .map(|method| {
36 (method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
37 })
38 .collect(),
39 }),
40 }
41 }
42
43 pub(crate) fn http(module: &RpcModule<()>) -> Self {
45 Self::new(module, RpcTransport::Http)
46 }
47
48 pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
52 Self::http(module)
53 }
54
55 pub(crate) fn ws(module: &RpcModule<()>) -> Self {
57 Self::new(module, RpcTransport::WebSocket)
58 }
59
60 #[allow(unused)]
62 pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
63 Self::new(module, RpcTransport::Ipc)
64 }
65}
66
67impl<S> Layer<S> for RpcRequestMetrics {
68 type Service = RpcRequestMetricsService<S>;
69
70 fn layer(&self, inner: S) -> Self::Service {
71 RpcRequestMetricsService::new(inner, self.clone())
72 }
73}
74
75#[derive(Default, Clone, Debug)]
77struct RpcServerMetricsInner {
78 connection_metrics: RpcServerConnectionMetrics,
80 call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
82}
83
84#[derive(Clone, Debug)]
88pub struct RpcRequestMetricsService<S> {
89 metrics: RpcRequestMetrics,
91 inner: S,
93}
94
95impl<S> RpcRequestMetricsService<S> {
96 pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
97 metrics.inner.connection_metrics.connections_opened_total.increment(1);
99 Self { inner: service, metrics }
100 }
101}
102
103impl<'a, S> RpcServiceT<'a> for RpcRequestMetricsService<S>
104where
105 S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
106{
107 type Future = MeteredRequestFuture<S::Future>;
108
109 fn call(&self, req: Request<'a>) -> Self::Future {
110 self.metrics.inner.connection_metrics.requests_started_total.increment(1);
111 let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
112 if let Some((_, call_metrics)) = &call_metrics {
113 call_metrics.started_total.increment(1);
114 }
115 MeteredRequestFuture {
116 fut: self.inner.call(req),
117 started_at: Instant::now(),
118 metrics: self.metrics.clone(),
119 method: call_metrics.map(|(method, _)| *method),
120 }
121 }
122}
123
124impl<S> Drop for RpcRequestMetricsService<S> {
125 fn drop(&mut self) {
126 self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
128 }
129}
130
131#[pin_project::pin_project]
133pub struct MeteredRequestFuture<F> {
134 #[pin]
135 fut: F,
136 started_at: Instant,
138 metrics: RpcRequestMetrics,
140 method: Option<&'static str>,
142}
143
144impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.write_str("MeteredRequestFuture")
147 }
148}
149
150impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
151 type Output = F::Output;
152
153 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154 let this = self.project();
155
156 let res = this.fut.poll(cx);
157 if let Poll::Ready(resp) = &res {
158 let elapsed = this.started_at.elapsed().as_secs_f64();
159
160 this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
162 this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
163
164 if let Some(call_metrics) =
166 this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
167 {
168 call_metrics.time_seconds.record(elapsed);
169 if resp.is_success() {
170 call_metrics.successful_total.increment(1);
171 } else {
172 call_metrics.failed_total.increment(1);
173 }
174 }
175 }
176 res
177 }
178}
179
180#[derive(Debug, Clone, Copy, Eq, PartialEq)]
182pub(crate) enum RpcTransport {
183 Http,
184 WebSocket,
185 #[allow(unused)]
186 Ipc,
187}
188
189impl RpcTransport {
190 pub(crate) const fn as_str(&self) -> &'static str {
192 match self {
193 Self::Http => "http",
194 Self::WebSocket => "ws",
195 Self::Ipc => "ipc",
196 }
197 }
198
199 fn connection_metrics(&self) -> RpcServerConnectionMetrics {
201 RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
202 }
203}
204
205#[derive(Metrics, Clone)]
207#[metrics(scope = "rpc_server.connections")]
208struct RpcServerConnectionMetrics {
209 connections_opened_total: Counter,
211 connections_closed_total: Counter,
213 requests_started_total: Counter,
215 requests_finished_total: Counter,
217 request_time_seconds: Histogram,
219}
220
221#[derive(Metrics, Clone)]
223#[metrics(scope = "rpc_server.calls")]
224struct RpcServerCallMetrics {
225 started_total: Counter,
227 successful_total: Counter,
229 failed_total: Counter,
231 time_seconds: Histogram,
233}