reth_rpc_builder/
rate_limiter.rs1use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request};
4use std::{
5 future::Future,
6 pin::Pin,
7 sync::Arc,
8 task::{ready, Context, Poll},
9};
10use tokio::sync::{OwnedSemaphorePermit, Semaphore};
11use tokio_util::sync::PollSemaphore;
12use tower::Layer;
13
14#[derive(Debug, Clone)]
18pub struct RpcRequestRateLimiter {
19 inner: Arc<RpcRequestRateLimiterInner>,
20}
21
22impl RpcRequestRateLimiter {
23 pub fn new(rate_limit: usize) -> Self {
25 Self {
26 inner: Arc::new(RpcRequestRateLimiterInner {
27 call_guard: PollSemaphore::new(Arc::new(Semaphore::new(rate_limit))),
28 }),
29 }
30 }
31}
32
33impl<S> Layer<S> for RpcRequestRateLimiter {
34 type Service = RpcRequestRateLimitingService<S>;
35
36 fn layer(&self, inner: S) -> Self::Service {
37 RpcRequestRateLimitingService::new(inner, self.clone())
38 }
39}
40
41#[derive(Debug, Clone)]
43struct RpcRequestRateLimiterInner {
44 call_guard: PollSemaphore,
46}
47
48#[derive(Debug, Clone)]
50pub struct RpcRequestRateLimitingService<S> {
51 rate_limiter: RpcRequestRateLimiter,
53 inner: S,
55}
56
57impl<S> RpcRequestRateLimitingService<S> {
58 pub const fn new(service: S, rate_limiter: RpcRequestRateLimiter) -> Self {
60 Self { inner: service, rate_limiter }
61 }
62}
63
64impl<S> RpcServiceT for RpcRequestRateLimitingService<S>
65where
66 S: RpcServiceT + Send + Sync + Clone + 'static,
67{
68 type MethodResponse = S::MethodResponse;
69 type NotificationResponse = S::NotificationResponse;
70 type BatchResponse = S::BatchResponse;
71
72 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
73 let method_name = req.method_name();
74 if method_name.starts_with("trace_") || method_name.starts_with("debug_") {
75 RateLimitingRequestFuture {
76 fut: self.inner.call(req),
77 guard: Some(self.rate_limiter.inner.call_guard.clone()),
78 permit: None,
79 }
80 } else {
81 RateLimitingRequestFuture { fut: self.inner.call(req), guard: None, permit: None }
84 }
85 }
86
87 fn batch<'a>(
88 &self,
89 requests: jsonrpsee::core::middleware::Batch<'a>,
90 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
91 self.inner.batch(requests)
92 }
93
94 fn notification<'a>(
95 &self,
96 n: jsonrpsee::core::middleware::Notification<'a>,
97 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
98 self.inner.notification(n)
99 }
100}
101
102#[pin_project::pin_project]
104pub struct RateLimitingRequestFuture<F> {
105 #[pin]
106 fut: F,
107 guard: Option<PollSemaphore>,
108 permit: Option<OwnedSemaphorePermit>,
109}
110
111impl<F> std::fmt::Debug for RateLimitingRequestFuture<F> {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.write_str("RateLimitingRequestFuture")
114 }
115}
116
117impl<F: Future> Future for RateLimitingRequestFuture<F> {
118 type Output = F::Output;
119
120 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121 let this = self.project();
122 if let Some(guard) = this.guard.as_mut() {
123 *this.permit = ready!(guard.poll_acquire(cx));
124 *this.guard = None;
125 }
126 let res = this.fut.poll(cx);
127 if res.is_ready() {
128 *this.permit = None;
129 }
130 res
131 }
132}