1use crate::error::{RpcError, ServerKind};
2use http::header::AUTHORIZATION;
3use jsonrpsee::{
4 core::RegisterMethodError,
5 http_client::{transport::HttpBackend, HeaderMap},
6 server::{AlreadyStoppedError, RpcModule},
7 Methods,
8};
9use reth_engine_primitives::EngineTypes;
10use reth_rpc_api::servers::*;
11use reth_rpc_eth_types::EthSubscriptionIdProvider;
12use reth_rpc_layer::{
13 secret_to_bearer_header, AuthClientLayer, AuthClientService, AuthLayer, JwtAuthValidator,
14 JwtSecret,
15};
16use reth_rpc_server_types::constants;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use tower::layer::util::Identity;
19
20pub use jsonrpsee::server::ServerBuilder;
21pub use reth_ipc::server::Builder as IpcServerBuilder;
22
23#[derive(Debug)]
25pub struct AuthServerConfig {
26 pub(crate) socket_addr: SocketAddr,
28 pub(crate) secret: JwtSecret,
30 pub(crate) server_config: ServerBuilder<Identity, Identity>,
32 pub(crate) ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
34 pub(crate) ipc_endpoint: Option<String>,
36}
37
38impl AuthServerConfig {
41 pub const fn builder(secret: JwtSecret) -> AuthServerConfigBuilder {
43 AuthServerConfigBuilder::new(secret)
44 }
45
46 pub const fn address(&self) -> SocketAddr {
48 self.socket_addr
49 }
50
51 pub async fn start(self, module: AuthRpcModule) -> Result<AuthServerHandle, RpcError> {
53 let Self { socket_addr, secret, server_config, ipc_server_config, ipc_endpoint } = self;
54
55 let middleware =
57 tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
58
59 let server = server_config
61 .set_http_middleware(middleware)
62 .build(socket_addr)
63 .await
64 .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
65
66 let local_addr = server
67 .local_addr()
68 .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
69
70 let handle = server.start(module.inner.clone());
71 let mut ipc_handle: Option<jsonrpsee::server::ServerHandle> = None;
72
73 if let Some(ipc_server_config) = ipc_server_config {
74 let ipc_endpoint_str = ipc_endpoint
75 .clone()
76 .unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string());
77 let ipc_server = ipc_server_config.build(ipc_endpoint_str);
78 let res = ipc_server
79 .start(module.inner)
80 .await
81 .map_err(reth_ipc::server::IpcServerStartError::from)?;
82 ipc_handle = Some(res);
83 }
84
85 Ok(AuthServerHandle { handle, local_addr, secret, ipc_endpoint, ipc_handle })
86 }
87}
88
89#[derive(Debug)]
91pub struct AuthServerConfigBuilder {
92 socket_addr: Option<SocketAddr>,
93 secret: JwtSecret,
94 server_config: Option<ServerBuilder<Identity, Identity>>,
95 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
96 ipc_endpoint: Option<String>,
97}
98
99impl AuthServerConfigBuilder {
102 pub const fn new(secret: JwtSecret) -> Self {
104 Self {
105 socket_addr: None,
106 secret,
107 server_config: None,
108 ipc_server_config: None,
109 ipc_endpoint: None,
110 }
111 }
112
113 pub const fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
115 self.socket_addr = Some(socket_addr);
116 self
117 }
118
119 pub const fn maybe_socket_addr(mut self, socket_addr: Option<SocketAddr>) -> Self {
121 self.socket_addr = socket_addr;
122 self
123 }
124
125 pub const fn secret(mut self, secret: JwtSecret) -> Self {
127 self.secret = secret;
128 self
129 }
130
131 pub fn with_server_config(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
136 self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
137 self
138 }
139
140 pub fn ipc_endpoint(mut self, ipc_endpoint: String) -> Self {
142 self.ipc_endpoint = Some(ipc_endpoint);
143 self
144 }
145
146 pub fn with_ipc_config(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
150 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
151 self
152 }
153
154 pub fn build(self) -> AuthServerConfig {
156 AuthServerConfig {
157 socket_addr: self.socket_addr.unwrap_or_else(|| {
158 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), constants::DEFAULT_AUTH_PORT)
159 }),
160 secret: self.secret,
161 server_config: self.server_config.unwrap_or_else(|| {
162 ServerBuilder::new()
163 .max_response_body_size(750 * 1024 * 1024)
167 .max_connections(500)
172 .max_request_body_size(128 * 1024 * 1024)
175 .set_id_provider(EthSubscriptionIdProvider::default())
176 }),
177 ipc_server_config: self.ipc_server_config.map(|ipc_server_config| {
178 ipc_server_config
179 .max_response_body_size(750 * 1024 * 1024)
180 .max_connections(500)
181 .max_request_body_size(128 * 1024 * 1024)
182 .set_id_provider(EthSubscriptionIdProvider::default())
183 }),
184 ipc_endpoint: self.ipc_endpoint,
185 }
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct AuthRpcModule {
192 pub(crate) inner: RpcModule<()>,
193}
194
195impl AuthRpcModule {
198 pub fn new<EngineApi, EngineT>(engine: EngineApi) -> Self
200 where
201 EngineT: EngineTypes,
202 EngineApi: EngineApiServer<EngineT>,
203 {
204 let mut module = RpcModule::new(());
205 module.merge(engine.into_rpc()).expect("No conflicting methods");
206 Self { inner: module }
207 }
208
209 pub fn module_mut(&mut self) -> &mut RpcModule<()> {
211 &mut self.inner
212 }
213
214 pub fn merge_auth_methods(
218 &mut self,
219 other: impl Into<Methods>,
220 ) -> Result<bool, RegisterMethodError> {
221 self.module_mut().merge(other.into()).map(|_| true)
222 }
223
224 pub fn remove_auth_method(&mut self, method_name: &'static str) -> bool {
228 self.module_mut().remove_method(method_name).is_some()
229 }
230
231 pub fn remove_auth_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
233 for name in methods {
234 self.remove_auth_method(name);
235 }
236 }
237
238 pub fn replace_auth_methods(
240 &mut self,
241 other: impl Into<Methods>,
242 ) -> Result<bool, RegisterMethodError> {
243 let other = other.into();
244 self.remove_auth_methods(other.method_names());
245 self.merge_auth_methods(other)
246 }
247
248 pub async fn start_server(
250 self,
251 config: AuthServerConfig,
252 ) -> Result<AuthServerHandle, RpcError> {
253 config.start(self).await
254 }
255}
256
257#[derive(Clone, Debug)]
262#[must_use = "Server stops if dropped"]
263pub struct AuthServerHandle {
264 local_addr: SocketAddr,
265 handle: jsonrpsee::server::ServerHandle,
266 secret: JwtSecret,
267 ipc_endpoint: Option<String>,
268 ipc_handle: Option<jsonrpsee::server::ServerHandle>,
269}
270
271impl AuthServerHandle {
274 pub const fn local_addr(&self) -> SocketAddr {
276 self.local_addr
277 }
278
279 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
281 self.handle.stop()
282 }
283
284 pub fn http_url(&self) -> String {
286 format!("http://{}", self.local_addr)
287 }
288
289 pub fn ws_url(&self) -> String {
291 format!("ws://{}", self.local_addr)
292 }
293
294 pub fn http_client(
296 &self,
297 ) -> jsonrpsee::http_client::HttpClient<AuthClientService<HttpBackend>> {
298 let secret_layer = AuthClientLayer::new(self.secret);
300 let middleware = tower::ServiceBuilder::default().layer(secret_layer);
301 jsonrpsee::http_client::HttpClientBuilder::default()
302 .set_http_middleware(middleware)
303 .build(self.http_url())
304 .expect("Failed to create http client")
305 }
306
307 pub async fn ws_client(&self) -> jsonrpsee::ws_client::WsClient {
310 jsonrpsee::ws_client::WsClientBuilder::default()
311 .set_headers(HeaderMap::from_iter([(
312 AUTHORIZATION,
313 secret_to_bearer_header(&self.secret),
314 )]))
315 .build(self.ws_url())
316 .await
317 .expect("Failed to create ws client")
318 }
319
320 #[cfg(unix)]
322 pub async fn ipc_client(&self) -> Option<jsonrpsee::async_client::Client> {
323 use reth_ipc::client::IpcClientBuilder;
324
325 if let Some(ipc_endpoint) = &self.ipc_endpoint {
326 return Some(
327 IpcClientBuilder::default()
328 .build(ipc_endpoint)
329 .await
330 .expect("Failed to create ipc client"),
331 )
332 }
333 None
334 }
335
336 pub fn ipc_handle(&self) -> Option<jsonrpsee::server::ServerHandle> {
338 self.ipc_handle.clone()
339 }
340
341 pub fn ipc_endpoint(&self) -> Option<String> {
343 self.ipc_endpoint.clone()
344 }
345}