1use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7 pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8 Filter, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18 pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27 wrappers::{BroadcastStream, ReceiverStream},
28 Stream,
29};
30use tracing::error;
31
32#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37 inner: Arc<EthPubSubInner<Eth>>,
39}
40
41impl<Eth> EthPubSub<Eth> {
44 pub fn new(eth_api: Eth) -> Self {
48 Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49 }
50
51 pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53 let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54 Self { inner: Arc::new(inner) }
55 }
56}
57
58#[async_trait::async_trait]
59impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
60where
61 Eth: RpcNodeCore<
62 Provider: BlockNumReader + CanonStateSubscriptions,
63 Pool: TransactionPool,
64 Network: NetworkInfo,
65 > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
66 + 'static,
67{
68 async fn subscribe(
70 &self,
71 pending: PendingSubscriptionSink,
72 kind: SubscriptionKind,
73 params: Option<Params>,
74 ) -> jsonrpsee::core::SubscriptionResult {
75 let sink = pending.accept().await?;
76 let pubsub = self.inner.clone();
77 self.inner.subscription_task_spawner.spawn(Box::pin(async move {
78 let _ = handle_accepted(pubsub, sink, kind, params).await;
79 }));
80
81 Ok(())
82 }
83}
84
85async fn handle_accepted<Eth>(
87 pubsub: Arc<EthPubSubInner<Eth>>,
88 accepted_sink: SubscriptionSink,
89 kind: SubscriptionKind,
90 params: Option<Params>,
91) -> Result<(), ErrorObject<'static>>
92where
93 Eth: RpcNodeCore<
94 Provider: BlockNumReader + CanonStateSubscriptions,
95 Pool: TransactionPool,
96 Network: NetworkInfo,
97 > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
98{
99 match kind {
100 SubscriptionKind::NewHeads => {
101 pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
102 }
103 SubscriptionKind::Logs => {
104 let filter = match params {
106 Some(Params::Logs(filter)) => *filter,
107 Some(Params::Bool(_)) => {
108 return Err(invalid_params_rpc_err("Invalid params for logs"))
109 }
110 _ => Default::default(),
111 };
112 pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
113 }
114 SubscriptionKind::NewPendingTransactions => {
115 if let Some(params) = params {
116 match params {
117 Params::Bool(true) => {
118 let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
120 let tx_value = match pubsub
121 .eth_api
122 .tx_resp_builder()
123 .fill_pending(tx.transaction.to_consensus())
124 {
125 Ok(tx) => Some(tx),
126 Err(err) => {
127 error!(target = "rpc",
128 %err,
129 "Failed to fill transaction with block context"
130 );
131 None
132 }
133 };
134 std::future::ready(tx_value)
135 });
136 return pipe_from_stream(accepted_sink, stream).await
137 }
138 Params::Bool(false) | Params::None => {
139 }
141 Params::Logs(_) => {
142 return Err(invalid_params_rpc_err(
143 "Invalid params for newPendingTransactions",
144 ))
145 }
146 }
147 }
148
149 pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
150 }
151 SubscriptionKind::Syncing => {
152 let mut canon_state =
154 BroadcastStream::new(pubsub.eth_api.provider().subscribe_to_canonical_state());
155 let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
157 let current_sub_res = pubsub.sync_status(initial_sync_status);
158
159 let msg = SubscriptionMessage::new(
161 accepted_sink.method_name(),
162 accepted_sink.subscription_id(),
163 ¤t_sub_res,
164 )
165 .map_err(SubscriptionSerializeError::new)?;
166
167 if accepted_sink.send(msg).await.is_err() {
168 return Ok(())
169 }
170
171 while canon_state.next().await.is_some() {
172 let current_syncing = pubsub.eth_api.network().is_syncing();
173 if current_syncing != initial_sync_status {
175 initial_sync_status = current_syncing;
177
178 let sync_status = pubsub.sync_status(current_syncing);
180 let msg = SubscriptionMessage::new(
181 accepted_sink.method_name(),
182 accepted_sink.subscription_id(),
183 &sync_status,
184 )
185 .map_err(SubscriptionSerializeError::new)?;
186
187 if accepted_sink.send(msg).await.is_err() {
188 break
189 }
190 }
191 }
192
193 Ok(())
194 }
195 }
196}
197
198#[derive(Debug, thiserror::Error)]
200#[error("Failed to serialize subscription item: {0}")]
201pub struct SubscriptionSerializeError(#[from] serde_json::Error);
202
203impl SubscriptionSerializeError {
204 const fn new(err: serde_json::Error) -> Self {
205 Self(err)
206 }
207}
208
209impl From<SubscriptionSerializeError> for ErrorObject<'static> {
210 fn from(value: SubscriptionSerializeError) -> Self {
211 internal_rpc_err(value.to_string())
212 }
213}
214
215async fn pipe_from_stream<T, St>(
217 sink: SubscriptionSink,
218 mut stream: St,
219) -> Result<(), ErrorObject<'static>>
220where
221 St: Stream<Item = T> + Unpin,
222 T: Serialize,
223{
224 loop {
225 tokio::select! {
226 _ = sink.closed() => {
227 break Ok(())
229 },
230 maybe_item = stream.next() => {
231 let item = match maybe_item {
232 Some(item) => item,
233 None => {
234 break Ok(())
236 },
237 };
238 let msg = SubscriptionMessage::new(
239 sink.method_name(),
240 sink.subscription_id(),
241 &item
242 ).map_err(SubscriptionSerializeError::new)?;
243
244 if sink.send(msg).await.is_err() {
245 break Ok(());
246 }
247 }
248 }
249 }
250}
251
252impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 f.debug_struct("EthPubSub").finish_non_exhaustive()
255 }
256}
257
258#[derive(Clone)]
260struct EthPubSubInner<EthApi> {
261 eth_api: EthApi,
263 subscription_task_spawner: Box<dyn TaskSpawner>,
265}
266
267impl<Eth> EthPubSubInner<Eth>
270where
271 Eth: RpcNodeCore<Provider: BlockNumReader>,
272{
273 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
275 if is_syncing {
276 let current_block = self
277 .eth_api
278 .provider()
279 .chain_info()
280 .map(|info| info.best_number)
281 .unwrap_or_default();
282 PubSubSyncStatus::Detailed(SyncStatusMetadata {
283 syncing: true,
284 starting_block: 0,
285 current_block,
286 highest_block: Some(current_block),
287 })
288 } else {
289 PubSubSyncStatus::Simple(false)
290 }
291 }
292}
293
294impl<Eth> EthPubSubInner<Eth>
295where
296 Eth: RpcNodeCore<Pool: TransactionPool>,
297{
298 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
300 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
301 }
302
303 fn full_pending_transaction_stream(
305 &self,
306 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
307 self.eth_api.pool().new_pending_pool_transactions_listener()
308 }
309}
310
311impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
312where
313 Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
314{
315 fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
317 self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
318 let headers = new_chain.committed().headers().collect::<Vec<_>>();
319 futures::stream::iter(
320 headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
321 )
322 })
323 }
324
325 fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
327 BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
328 .map(move |canon_state| {
329 canon_state.expect("new block subscription never ends").block_receipts()
330 })
331 .flat_map(futures::stream::iter)
332 .flat_map(move |(block_receipts, removed)| {
333 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
334 &filter,
335 block_receipts.block,
336 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
337 removed,
338 );
339 futures::stream::iter(all_logs)
340 })
341 }
342}