reth_rpc_builder/
auth.rs

1use crate::error::{RpcError, ServerKind};
2use http::header::AUTHORIZATION;
3use jsonrpsee::{
4    core::RegisterMethodError,
5    http_client::{transport::HttpBackend, HeaderMap},
6    server::{AlreadyStoppedError, RpcModule},
7    Methods,
8};
9use reth_engine_primitives::EngineTypes;
10use reth_rpc_api::servers::*;
11use reth_rpc_eth_types::EthSubscriptionIdProvider;
12use reth_rpc_layer::{
13    secret_to_bearer_header, AuthClientLayer, AuthClientService, AuthLayer, JwtAuthValidator,
14    JwtSecret,
15};
16use reth_rpc_server_types::constants;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use tower::layer::util::Identity;
19
20pub use jsonrpsee::server::ServerBuilder;
21pub use reth_ipc::server::Builder as IpcServerBuilder;
22
23/// Server configuration for the auth server.
24#[derive(Debug)]
25pub struct AuthServerConfig {
26    /// Where the server should listen.
27    pub(crate) socket_addr: SocketAddr,
28    /// The secret for the auth layer of the server.
29    pub(crate) secret: JwtSecret,
30    /// Configs for JSON-RPC Http.
31    pub(crate) server_config: ServerBuilder<Identity, Identity>,
32    /// Configs for IPC server
33    pub(crate) ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
34    /// IPC endpoint
35    pub(crate) ipc_endpoint: Option<String>,
36}
37
38// === impl AuthServerConfig ===
39
40impl AuthServerConfig {
41    /// Convenience function to create a new `AuthServerConfig`.
42    pub const fn builder(secret: JwtSecret) -> AuthServerConfigBuilder {
43        AuthServerConfigBuilder::new(secret)
44    }
45
46    /// Returns the address the server will listen on.
47    pub const fn address(&self) -> SocketAddr {
48        self.socket_addr
49    }
50
51    /// Convenience function to start a server in one step.
52    pub async fn start(self, module: AuthRpcModule) -> Result<AuthServerHandle, RpcError> {
53        let Self { socket_addr, secret, server_config, ipc_server_config, ipc_endpoint } = self;
54
55        // Create auth middleware.
56        let middleware =
57            tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
58
59        // By default, both http and ws are enabled.
60        let server = server_config
61            .set_http_middleware(middleware)
62            .build(socket_addr)
63            .await
64            .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
65
66        let local_addr = server
67            .local_addr()
68            .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
69
70        let handle = server.start(module.inner.clone());
71        let mut ipc_handle: Option<jsonrpsee::server::ServerHandle> = None;
72
73        if let Some(ipc_server_config) = ipc_server_config {
74            let ipc_endpoint_str = ipc_endpoint
75                .clone()
76                .unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string());
77            let ipc_server = ipc_server_config.build(ipc_endpoint_str);
78            let res = ipc_server
79                .start(module.inner)
80                .await
81                .map_err(reth_ipc::server::IpcServerStartError::from)?;
82            ipc_handle = Some(res);
83        }
84
85        Ok(AuthServerHandle { handle, local_addr, secret, ipc_endpoint, ipc_handle })
86    }
87}
88
89/// Builder type for configuring an `AuthServerConfig`.
90#[derive(Debug)]
91pub struct AuthServerConfigBuilder {
92    socket_addr: Option<SocketAddr>,
93    secret: JwtSecret,
94    server_config: Option<ServerBuilder<Identity, Identity>>,
95    ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
96    ipc_endpoint: Option<String>,
97}
98
99// === impl AuthServerConfigBuilder ===
100
101impl AuthServerConfigBuilder {
102    /// Create a new `AuthServerConfigBuilder` with the given `secret`.
103    pub const fn new(secret: JwtSecret) -> Self {
104        Self {
105            socket_addr: None,
106            secret,
107            server_config: None,
108            ipc_server_config: None,
109            ipc_endpoint: None,
110        }
111    }
112
113    /// Set the socket address for the server.
114    pub const fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
115        self.socket_addr = Some(socket_addr);
116        self
117    }
118
119    /// Set the socket address for the server.
120    pub const fn maybe_socket_addr(mut self, socket_addr: Option<SocketAddr>) -> Self {
121        self.socket_addr = socket_addr;
122        self
123    }
124
125    /// Set the secret for the server.
126    pub const fn secret(mut self, secret: JwtSecret) -> Self {
127        self.secret = secret;
128        self
129    }
130
131    /// Configures the JSON-RPC server
132    ///
133    /// Note: this always configures an [`EthSubscriptionIdProvider`]
134    /// [`IdProvider`](jsonrpsee::server::IdProvider) for convenience.
135    pub fn with_server_config(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
136        self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
137        self
138    }
139
140    /// Set the ipc endpoint for the server.
141    pub fn ipc_endpoint(mut self, ipc_endpoint: String) -> Self {
142        self.ipc_endpoint = Some(ipc_endpoint);
143        self
144    }
145
146    /// Configures the IPC server
147    ///
148    /// Note: this always configures an [`EthSubscriptionIdProvider`]
149    pub fn with_ipc_config(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
150        self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
151        self
152    }
153
154    /// Build the `AuthServerConfig`.
155    pub fn build(self) -> AuthServerConfig {
156        AuthServerConfig {
157            socket_addr: self.socket_addr.unwrap_or_else(|| {
158                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), constants::DEFAULT_AUTH_PORT)
159            }),
160            secret: self.secret,
161            server_config: self.server_config.unwrap_or_else(|| {
162                ServerBuilder::new()
163                    // This needs to large enough to handle large eth_getLogs responses and maximum
164                    // payload bodies limit for `engine_getPayloadBodiesByRangeV`
165                    // ~750MB per response should be enough
166                    .max_response_body_size(750 * 1024 * 1024)
167                    // Connections to this server are always authenticated, hence this only affects
168                    // connections from the CL or any other client that uses JWT, this should be
169                    // more than enough so that the CL (or multiple CL nodes) will never get rate
170                    // limited
171                    .max_connections(500)
172                    // bump the default request size slightly, there aren't any methods exposed with
173                    // dynamic request params that can exceed this
174                    .max_request_body_size(128 * 1024 * 1024)
175                    .set_id_provider(EthSubscriptionIdProvider::default())
176            }),
177            ipc_server_config: self.ipc_server_config.map(|ipc_server_config| {
178                ipc_server_config
179                    .max_response_body_size(750 * 1024 * 1024)
180                    .max_connections(500)
181                    .max_request_body_size(128 * 1024 * 1024)
182                    .set_id_provider(EthSubscriptionIdProvider::default())
183            }),
184            ipc_endpoint: self.ipc_endpoint,
185        }
186    }
187}
188
189/// Holds installed modules for the auth server.
190#[derive(Debug, Clone)]
191pub struct AuthRpcModule {
192    pub(crate) inner: RpcModule<()>,
193}
194
195// === impl AuthRpcModule ===
196
197impl AuthRpcModule {
198    /// Create a new `AuthRpcModule` with the given `engine_api`.
199    pub fn new<EngineApi, EngineT>(engine: EngineApi) -> Self
200    where
201        EngineT: EngineTypes,
202        EngineApi: EngineApiServer<EngineT>,
203    {
204        let mut module = RpcModule::new(());
205        module.merge(engine.into_rpc()).expect("No conflicting methods");
206        Self { inner: module }
207    }
208
209    /// Get a reference to the inner `RpcModule`.
210    pub fn module_mut(&mut self) -> &mut RpcModule<()> {
211        &mut self.inner
212    }
213
214    /// Merge the given [Methods] in the configured authenticated methods.
215    ///
216    /// Fails if any of the methods in other is present already.
217    pub fn merge_auth_methods(
218        &mut self,
219        other: impl Into<Methods>,
220    ) -> Result<bool, RegisterMethodError> {
221        self.module_mut().merge(other.into()).map(|_| true)
222    }
223
224    /// Removes the method with the given name from the configured authenticated methods.
225    ///
226    /// Returns `true` if the method was found and removed, `false` otherwise.
227    pub fn remove_auth_method(&mut self, method_name: &'static str) -> bool {
228        self.module_mut().remove_method(method_name).is_some()
229    }
230
231    /// Removes the given methods from the configured authenticated methods.
232    pub fn remove_auth_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
233        for name in methods {
234            self.remove_auth_method(name);
235        }
236    }
237
238    /// Replace the given [Methods] in the configured authenticated methods.
239    pub fn replace_auth_methods(
240        &mut self,
241        other: impl Into<Methods>,
242    ) -> Result<bool, RegisterMethodError> {
243        let other = other.into();
244        self.remove_auth_methods(other.method_names());
245        self.merge_auth_methods(other)
246    }
247
248    /// Convenience function for starting a server
249    pub async fn start_server(
250        self,
251        config: AuthServerConfig,
252    ) -> Result<AuthServerHandle, RpcError> {
253        config.start(self).await
254    }
255}
256
257/// A handle to the spawned auth server.
258///
259/// When this type is dropped or [`AuthServerHandle::stop`] has been called the server will be
260/// stopped.
261#[derive(Clone, Debug)]
262#[must_use = "Server stops if dropped"]
263pub struct AuthServerHandle {
264    local_addr: SocketAddr,
265    handle: jsonrpsee::server::ServerHandle,
266    secret: JwtSecret,
267    ipc_endpoint: Option<String>,
268    ipc_handle: Option<jsonrpsee::server::ServerHandle>,
269}
270
271// === impl AuthServerHandle ===
272
273impl AuthServerHandle {
274    /// Returns the [`SocketAddr`] of the http server if started.
275    pub const fn local_addr(&self) -> SocketAddr {
276        self.local_addr
277    }
278
279    /// Tell the server to stop without waiting for the server to stop.
280    pub fn stop(self) -> Result<(), AlreadyStoppedError> {
281        self.handle.stop()
282    }
283
284    /// Returns the url to the http server
285    pub fn http_url(&self) -> String {
286        format!("http://{}", self.local_addr)
287    }
288
289    /// Returns the url to the ws server
290    pub fn ws_url(&self) -> String {
291        format!("ws://{}", self.local_addr)
292    }
293
294    /// Returns a http client connected to the server.
295    pub fn http_client(
296        &self,
297    ) -> jsonrpsee::http_client::HttpClient<AuthClientService<HttpBackend>> {
298        // Create a middleware that adds a new JWT token to every request.
299        let secret_layer = AuthClientLayer::new(self.secret);
300        let middleware = tower::ServiceBuilder::default().layer(secret_layer);
301        jsonrpsee::http_client::HttpClientBuilder::default()
302            .set_http_middleware(middleware)
303            .build(self.http_url())
304            .expect("Failed to create http client")
305    }
306
307    /// Returns a ws client connected to the server. Note that the connection can only be
308    /// be established within 1 minute due to the JWT token expiration.
309    pub async fn ws_client(&self) -> jsonrpsee::ws_client::WsClient {
310        jsonrpsee::ws_client::WsClientBuilder::default()
311            .set_headers(HeaderMap::from_iter([(
312                AUTHORIZATION,
313                secret_to_bearer_header(&self.secret),
314            )]))
315            .build(self.ws_url())
316            .await
317            .expect("Failed to create ws client")
318    }
319
320    /// Returns an ipc client connected to the server.
321    #[cfg(unix)]
322    pub async fn ipc_client(&self) -> Option<jsonrpsee::async_client::Client> {
323        use reth_ipc::client::IpcClientBuilder;
324
325        if let Some(ipc_endpoint) = &self.ipc_endpoint {
326            return Some(
327                IpcClientBuilder::default()
328                    .build(ipc_endpoint)
329                    .await
330                    .expect("Failed to create ipc client"),
331            )
332        }
333        None
334    }
335
336    /// Returns an ipc handle
337    pub fn ipc_handle(&self) -> Option<jsonrpsee::server::ServerHandle> {
338        self.ipc_handle.clone()
339    }
340
341    /// Return an ipc endpoint
342    pub fn ipc_endpoint(&self) -> Option<String> {
343        self.ipc_endpoint.clone()
344    }
345}