1use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7 pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8 FilteredParams, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12 server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_network_api::NetworkInfo;
15use reth_primitives::NodePrimitives;
16use reth_provider::{BlockNumReader, CanonStateSubscriptions};
17use reth_rpc_eth_api::{
18 pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_rpc_types_compat::transaction::from_recovered;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27 wrappers::{BroadcastStream, ReceiverStream},
28 Stream,
29};
30use tracing::error;
31
32#[derive(Clone)]
36pub struct EthPubSub<Eth, Events> {
37 inner: Arc<EthPubSubInner<Eth, Events>>,
39 subscription_task_spawner: Box<dyn TaskSpawner>,
41}
42
43impl<Eth, Events> EthPubSub<Eth, Events> {
46 pub fn new(eth_api: Eth, chain_events: Events) -> Self {
50 Self::with_spawner(eth_api, chain_events, Box::<TokioTaskExecutor>::default())
51 }
52
53 pub fn with_spawner(
55 eth_api: Eth,
56 chain_events: Events,
57 subscription_task_spawner: Box<dyn TaskSpawner>,
58 ) -> Self {
59 let inner = EthPubSubInner { eth_api, chain_events };
60 Self { inner: Arc::new(inner), subscription_task_spawner }
61 }
62}
63
64#[async_trait::async_trait]
65impl<Eth, Events> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth, Events>
66where
67 Events: CanonStateSubscriptions + 'static,
68 Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
69 + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
70 + 'static,
71{
72 async fn subscribe(
74 &self,
75 pending: PendingSubscriptionSink,
76 kind: SubscriptionKind,
77 params: Option<Params>,
78 ) -> jsonrpsee::core::SubscriptionResult {
79 let sink = pending.accept().await?;
80 let pubsub = self.inner.clone();
81 self.subscription_task_spawner.spawn(Box::pin(async move {
82 let _ = handle_accepted(pubsub, sink, kind, params).await;
83 }));
84
85 Ok(())
86 }
87}
88
89async fn handle_accepted<Eth, Events>(
91 pubsub: Arc<EthPubSubInner<Eth, Events>>,
92 accepted_sink: SubscriptionSink,
93 kind: SubscriptionKind,
94 params: Option<Params>,
95) -> Result<(), ErrorObject<'static>>
96where
97 Events: CanonStateSubscriptions + 'static,
98 Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
99 + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
100{
101 match kind {
102 SubscriptionKind::NewHeads => {
103 pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
104 }
105 SubscriptionKind::Logs => {
106 let filter = match params {
108 Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
109 Some(Params::Bool(_)) => {
110 return Err(invalid_params_rpc_err("Invalid params for logs"))
111 }
112 _ => FilteredParams::default(),
113 };
114 pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
115 }
116 SubscriptionKind::NewPendingTransactions => {
117 if let Some(params) = params {
118 match params {
119 Params::Bool(true) => {
120 let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
122 let tx_value = match from_recovered(
123 tx.transaction.to_consensus(),
124 pubsub.eth_api.tx_resp_builder(),
125 ) {
126 Ok(tx) => Some(tx),
127 Err(err) => {
128 error!(target = "rpc",
129 %err,
130 "Failed to fill transaction with block context"
131 );
132 None
133 }
134 };
135 std::future::ready(tx_value)
136 });
137 return pipe_from_stream(accepted_sink, stream).await
138 }
139 Params::Bool(false) | Params::None => {
140 }
142 Params::Logs(_) => {
143 return Err(invalid_params_rpc_err(
144 "Invalid params for newPendingTransactions",
145 ))
146 }
147 }
148 }
149
150 pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
151 }
152 SubscriptionKind::Syncing => {
153 let mut canon_state =
155 BroadcastStream::new(pubsub.chain_events.subscribe_to_canonical_state());
156 let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
158 let current_sub_res = pubsub.sync_status(initial_sync_status);
159
160 let msg = SubscriptionMessage::from_json(¤t_sub_res)
162 .map_err(SubscriptionSerializeError::new)?;
163 if accepted_sink.send(msg).await.is_err() {
164 return Ok(())
165 }
166
167 while canon_state.next().await.is_some() {
168 let current_syncing = pubsub.eth_api.network().is_syncing();
169 if current_syncing != initial_sync_status {
171 initial_sync_status = current_syncing;
173
174 let sync_status = pubsub.sync_status(current_syncing);
176 let msg = SubscriptionMessage::from_json(&sync_status)
177 .map_err(SubscriptionSerializeError::new)?;
178 if accepted_sink.send(msg).await.is_err() {
179 break
180 }
181 }
182 }
183
184 Ok(())
185 }
186 }
187}
188
189#[derive(Debug, thiserror::Error)]
191#[error("Failed to serialize subscription item: {0}")]
192pub struct SubscriptionSerializeError(#[from] serde_json::Error);
193
194impl SubscriptionSerializeError {
195 const fn new(err: serde_json::Error) -> Self {
196 Self(err)
197 }
198}
199
200impl From<SubscriptionSerializeError> for ErrorObject<'static> {
201 fn from(value: SubscriptionSerializeError) -> Self {
202 internal_rpc_err(value.to_string())
203 }
204}
205
206async fn pipe_from_stream<T, St>(
208 sink: SubscriptionSink,
209 mut stream: St,
210) -> Result<(), ErrorObject<'static>>
211where
212 St: Stream<Item = T> + Unpin,
213 T: Serialize,
214{
215 loop {
216 tokio::select! {
217 _ = sink.closed() => {
218 break Ok(())
220 },
221 maybe_item = stream.next() => {
222 let item = match maybe_item {
223 Some(item) => item,
224 None => {
225 break Ok(())
227 },
228 };
229 let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?;
230 if sink.send(msg).await.is_err() {
231 break Ok(());
232 }
233 }
234 }
235 }
236}
237
238impl<Eth, Events> std::fmt::Debug for EthPubSub<Eth, Events> {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 f.debug_struct("EthPubSub").finish_non_exhaustive()
241 }
242}
243
244#[derive(Clone)]
246struct EthPubSubInner<EthApi, Events> {
247 eth_api: EthApi,
249 chain_events: Events,
251}
252
253impl<Eth, Events> EthPubSubInner<Eth, Events>
256where
257 Eth: RpcNodeCore<Provider: BlockNumReader>,
258{
259 fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
261 if is_syncing {
262 let current_block = self
263 .eth_api
264 .provider()
265 .chain_info()
266 .map(|info| info.best_number)
267 .unwrap_or_default();
268 PubSubSyncStatus::Detailed(SyncStatusMetadata {
269 syncing: true,
270 starting_block: 0,
271 current_block,
272 highest_block: Some(current_block),
273 })
274 } else {
275 PubSubSyncStatus::Simple(false)
276 }
277 }
278}
279
280impl<Eth, Events> EthPubSubInner<Eth, Events>
281where
282 Eth: RpcNodeCore<Pool: TransactionPool>,
283{
284 fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
286 ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
287 }
288
289 fn full_pending_transaction_stream(
291 &self,
292 ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
293 self.eth_api.pool().new_pending_pool_transactions_listener()
294 }
295}
296
297impl<Eth, Events> EthPubSubInner<Eth, Events>
298where
299 Events: CanonStateSubscriptions,
300{
301 fn new_headers_stream(
303 &self,
304 ) -> impl Stream<Item = Header<<Events::Primitives as NodePrimitives>::BlockHeader>> {
305 self.chain_events.canonical_state_stream().flat_map(|new_chain| {
306 let headers = new_chain.committed().headers().collect::<Vec<_>>();
307 futures::stream::iter(
308 headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
309 )
310 })
311 }
312
313 fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
315 BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
316 .map(move |canon_state| {
317 canon_state.expect("new block subscription never ends").block_receipts()
318 })
319 .flat_map(futures::stream::iter)
320 .flat_map(move |(block_receipts, removed)| {
321 let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
322 &filter,
323 block_receipts.block,
324 block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
325 removed,
326 );
327 futures::stream::iter(all_logs)
328 })
329 }
330}