reth_rpc_builder/
metrics.rs1use jsonrpsee::{
2 core::middleware::{Batch, Notification},
3 server::middleware::rpc::RpcServiceT,
4 types::Request,
5 MethodResponse, RpcModule,
6};
7use reth_metrics::{
8 metrics::{Counter, Histogram},
9 Metrics,
10};
11use std::{
12 collections::HashMap,
13 future::Future,
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll},
17 time::Instant,
18};
19use tower::Layer;
20
21#[derive(Default, Debug, Clone)]
29pub(crate) struct RpcRequestMetrics {
30 inner: Arc<RpcServerMetricsInner>,
31}
32
33impl RpcRequestMetrics {
34 pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
35 Self {
36 inner: Arc::new(RpcServerMetricsInner {
37 connection_metrics: transport.connection_metrics(),
38 call_metrics: module
39 .method_names()
40 .map(|method| {
41 (method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
42 })
43 .collect(),
44 }),
45 }
46 }
47
48 pub(crate) fn http(module: &RpcModule<()>) -> Self {
50 Self::new(module, RpcTransport::Http)
51 }
52
53 pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
57 Self::http(module)
58 }
59
60 pub(crate) fn ws(module: &RpcModule<()>) -> Self {
62 Self::new(module, RpcTransport::WebSocket)
63 }
64
65 pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
67 Self::new(module, RpcTransport::Ipc)
68 }
69}
70
71impl<S> Layer<S> for RpcRequestMetrics {
72 type Service = RpcRequestMetricsService<S>;
73
74 fn layer(&self, inner: S) -> Self::Service {
75 RpcRequestMetricsService::new(inner, self.clone())
76 }
77}
78
79#[derive(Default, Clone, Debug)]
81struct RpcServerMetricsInner {
82 connection_metrics: RpcServerConnectionMetrics,
84 call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
86}
87
88#[derive(Clone, Debug)]
92pub struct RpcRequestMetricsService<S> {
93 metrics: RpcRequestMetrics,
95 inner: S,
97}
98
99impl<S> RpcRequestMetricsService<S> {
100 pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
101 metrics.inner.connection_metrics.connections_opened_total.increment(1);
103 Self { inner: service, metrics }
104 }
105}
106
107impl<S> RpcServiceT for RpcRequestMetricsService<S>
108where
109 S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
110{
111 type MethodResponse = S::MethodResponse;
112 type NotificationResponse = S::NotificationResponse;
113 type BatchResponse = S::BatchResponse;
114
115 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = S::MethodResponse> + Send + 'a {
116 self.metrics.inner.connection_metrics.requests_started_total.increment(1);
117 let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
118 if let Some((_, call_metrics)) = &call_metrics {
119 call_metrics.started_total.increment(1);
120 }
121 MeteredRequestFuture {
122 fut: self.inner.call(req),
123 started_at: Instant::now(),
124 metrics: self.metrics.clone(),
125 method: call_metrics.map(|(method, _)| *method),
126 }
127 }
128
129 fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
130 self.inner.batch(req)
131 }
132
133 fn notification<'a>(
134 &self,
135 n: Notification<'a>,
136 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
137 self.inner.notification(n)
138 }
139}
140
141impl<S> Drop for RpcRequestMetricsService<S> {
142 fn drop(&mut self) {
143 self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
145 }
146}
147
148#[pin_project::pin_project]
150pub struct MeteredRequestFuture<F> {
151 #[pin]
152 fut: F,
153 started_at: Instant,
155 metrics: RpcRequestMetrics,
157 method: Option<&'static str>,
159}
160
161impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.write_str("MeteredRequestFuture")
164 }
165}
166
167impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
168 type Output = F::Output;
169
170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let this = self.project();
172
173 let res = this.fut.poll(cx);
174 if let Poll::Ready(resp) = &res {
175 let elapsed = this.started_at.elapsed().as_secs_f64();
176
177 this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
179 this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
180
181 if let Some(call_metrics) =
183 this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
184 {
185 call_metrics.time_seconds.record(elapsed);
186 if resp.is_success() {
187 call_metrics.successful_total.increment(1);
188 } else {
189 call_metrics.failed_total.increment(1);
190 }
191 }
192 }
193 res
194 }
195}
196
197#[derive(Debug, Clone, Copy, Eq, PartialEq)]
199pub(crate) enum RpcTransport {
200 Http,
201 WebSocket,
202 Ipc,
203}
204
205impl RpcTransport {
206 pub(crate) const fn as_str(&self) -> &'static str {
208 match self {
209 Self::Http => "http",
210 Self::WebSocket => "ws",
211 Self::Ipc => "ipc",
212 }
213 }
214
215 fn connection_metrics(&self) -> RpcServerConnectionMetrics {
217 RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
218 }
219}
220
221#[derive(Metrics, Clone)]
223#[metrics(scope = "rpc_server.connections")]
224struct RpcServerConnectionMetrics {
225 connections_opened_total: Counter,
227 connections_closed_total: Counter,
229 requests_started_total: Counter,
231 requests_finished_total: Counter,
233 request_time_seconds: Histogram,
235}
236
237#[derive(Metrics, Clone)]
239#[metrics(scope = "rpc_server.calls")]
240struct RpcServerCallMetrics {
241 started_total: Counter,
243 successful_total: Counter,
245 failed_total: Counter,
247 time_seconds: Histogram,
249}