reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    FilteredParams, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_network_api::NetworkInfo;
15use reth_primitives::NodePrimitives;
16use reth_provider::{BlockNumReader, CanonStateSubscriptions};
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_rpc_types_compat::transaction::from_recovered;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth, Events> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth, Events>>,
39    /// The type that's used to spawn subscription tasks.
40    subscription_task_spawner: Box<dyn TaskSpawner>,
41}
42
43// === impl EthPubSub ===
44
45impl<Eth, Events> EthPubSub<Eth, Events> {
46    /// Creates a new, shareable instance.
47    ///
48    /// Subscription tasks are spawned via [`tokio::task::spawn`]
49    pub fn new(eth_api: Eth, chain_events: Events) -> Self {
50        Self::with_spawner(eth_api, chain_events, Box::<TokioTaskExecutor>::default())
51    }
52
53    /// Creates a new, shareable instance.
54    pub fn with_spawner(
55        eth_api: Eth,
56        chain_events: Events,
57        subscription_task_spawner: Box<dyn TaskSpawner>,
58    ) -> Self {
59        let inner = EthPubSubInner { eth_api, chain_events };
60        Self { inner: Arc::new(inner), subscription_task_spawner }
61    }
62}
63
64#[async_trait::async_trait]
65impl<Eth, Events> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth, Events>
66where
67    Events: CanonStateSubscriptions + 'static,
68    Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
69        + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
70        + 'static,
71{
72    /// Handler for `eth_subscribe`
73    async fn subscribe(
74        &self,
75        pending: PendingSubscriptionSink,
76        kind: SubscriptionKind,
77        params: Option<Params>,
78    ) -> jsonrpsee::core::SubscriptionResult {
79        let sink = pending.accept().await?;
80        let pubsub = self.inner.clone();
81        self.subscription_task_spawner.spawn(Box::pin(async move {
82            let _ = handle_accepted(pubsub, sink, kind, params).await;
83        }));
84
85        Ok(())
86    }
87}
88
89/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
90async fn handle_accepted<Eth, Events>(
91    pubsub: Arc<EthPubSubInner<Eth, Events>>,
92    accepted_sink: SubscriptionSink,
93    kind: SubscriptionKind,
94    params: Option<Params>,
95) -> Result<(), ErrorObject<'static>>
96where
97    Events: CanonStateSubscriptions + 'static,
98    Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
99        + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
100{
101    match kind {
102        SubscriptionKind::NewHeads => {
103            pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
104        }
105        SubscriptionKind::Logs => {
106            // if no params are provided, used default filter params
107            let filter = match params {
108                Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
109                Some(Params::Bool(_)) => {
110                    return Err(invalid_params_rpc_err("Invalid params for logs"))
111                }
112                _ => FilteredParams::default(),
113            };
114            pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
115        }
116        SubscriptionKind::NewPendingTransactions => {
117            if let Some(params) = params {
118                match params {
119                    Params::Bool(true) => {
120                        // full transaction objects requested
121                        let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
122                            let tx_value = match from_recovered(
123                                tx.transaction.to_consensus(),
124                                pubsub.eth_api.tx_resp_builder(),
125                            ) {
126                                Ok(tx) => Some(tx),
127                                Err(err) => {
128                                    error!(target = "rpc",
129                                        %err,
130                                        "Failed to fill transaction with block context"
131                                    );
132                                    None
133                                }
134                            };
135                            std::future::ready(tx_value)
136                        });
137                        return pipe_from_stream(accepted_sink, stream).await
138                    }
139                    Params::Bool(false) | Params::None => {
140                        // only hashes requested
141                    }
142                    Params::Logs(_) => {
143                        return Err(invalid_params_rpc_err(
144                            "Invalid params for newPendingTransactions",
145                        ))
146                    }
147                }
148            }
149
150            pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
151        }
152        SubscriptionKind::Syncing => {
153            // get new block subscription
154            let mut canon_state =
155                BroadcastStream::new(pubsub.chain_events.subscribe_to_canonical_state());
156            // get current sync status
157            let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
158            let current_sub_res = pubsub.sync_status(initial_sync_status);
159
160            // send the current status immediately
161            let msg = SubscriptionMessage::from_json(&current_sub_res)
162                .map_err(SubscriptionSerializeError::new)?;
163            if accepted_sink.send(msg).await.is_err() {
164                return Ok(())
165            }
166
167            while canon_state.next().await.is_some() {
168                let current_syncing = pubsub.eth_api.network().is_syncing();
169                // Only send a new response if the sync status has changed
170                if current_syncing != initial_sync_status {
171                    // Update the sync status on each new block
172                    initial_sync_status = current_syncing;
173
174                    // send a new message now that the status changed
175                    let sync_status = pubsub.sync_status(current_syncing);
176                    let msg = SubscriptionMessage::from_json(&sync_status)
177                        .map_err(SubscriptionSerializeError::new)?;
178                    if accepted_sink.send(msg).await.is_err() {
179                        break
180                    }
181                }
182            }
183
184            Ok(())
185        }
186    }
187}
188
189/// Helper to convert a serde error into an [`ErrorObject`]
190#[derive(Debug, thiserror::Error)]
191#[error("Failed to serialize subscription item: {0}")]
192pub struct SubscriptionSerializeError(#[from] serde_json::Error);
193
194impl SubscriptionSerializeError {
195    const fn new(err: serde_json::Error) -> Self {
196        Self(err)
197    }
198}
199
200impl From<SubscriptionSerializeError> for ErrorObject<'static> {
201    fn from(value: SubscriptionSerializeError) -> Self {
202        internal_rpc_err(value.to_string())
203    }
204}
205
206/// Pipes all stream items to the subscription sink.
207async fn pipe_from_stream<T, St>(
208    sink: SubscriptionSink,
209    mut stream: St,
210) -> Result<(), ErrorObject<'static>>
211where
212    St: Stream<Item = T> + Unpin,
213    T: Serialize,
214{
215    loop {
216        tokio::select! {
217            _ = sink.closed() => {
218                // connection dropped
219                break Ok(())
220            },
221            maybe_item = stream.next() => {
222                let item = match maybe_item {
223                    Some(item) => item,
224                    None => {
225                        // stream ended
226                        break  Ok(())
227                    },
228                };
229                let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?;
230                if sink.send(msg).await.is_err() {
231                    break Ok(());
232                }
233            }
234        }
235    }
236}
237
238impl<Eth, Events> std::fmt::Debug for EthPubSub<Eth, Events> {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        f.debug_struct("EthPubSub").finish_non_exhaustive()
241    }
242}
243
244/// Container type `EthPubSub`
245#[derive(Clone)]
246struct EthPubSubInner<EthApi, Events> {
247    /// The `eth` API.
248    eth_api: EthApi,
249    /// A type that allows to create new event subscriptions.
250    chain_events: Events,
251}
252
253// == impl EthPubSubInner ===
254
255impl<Eth, Events> EthPubSubInner<Eth, Events>
256where
257    Eth: RpcNodeCore<Provider: BlockNumReader>,
258{
259    /// Returns the current sync status for the `syncing` subscription
260    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
261        if is_syncing {
262            let current_block = self
263                .eth_api
264                .provider()
265                .chain_info()
266                .map(|info| info.best_number)
267                .unwrap_or_default();
268            PubSubSyncStatus::Detailed(SyncStatusMetadata {
269                syncing: true,
270                starting_block: 0,
271                current_block,
272                highest_block: Some(current_block),
273            })
274        } else {
275            PubSubSyncStatus::Simple(false)
276        }
277    }
278}
279
280impl<Eth, Events> EthPubSubInner<Eth, Events>
281where
282    Eth: RpcNodeCore<Pool: TransactionPool>,
283{
284    /// Returns a stream that yields all transaction hashes emitted by the txpool.
285    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
286        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
287    }
288
289    /// Returns a stream that yields all transactions emitted by the txpool.
290    fn full_pending_transaction_stream(
291        &self,
292    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
293        self.eth_api.pool().new_pending_pool_transactions_listener()
294    }
295}
296
297impl<Eth, Events> EthPubSubInner<Eth, Events>
298where
299    Events: CanonStateSubscriptions,
300{
301    /// Returns a stream that yields all new RPC blocks.
302    fn new_headers_stream(
303        &self,
304    ) -> impl Stream<Item = Header<<Events::Primitives as NodePrimitives>::BlockHeader>> {
305        self.chain_events.canonical_state_stream().flat_map(|new_chain| {
306            let headers = new_chain.committed().headers().collect::<Vec<_>>();
307            futures::stream::iter(
308                headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
309            )
310        })
311    }
312
313    /// Returns a stream that yields all logs that match the given filter.
314    fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
315        BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
316            .map(move |canon_state| {
317                canon_state.expect("new block subscription never ends").block_receipts()
318            })
319            .flat_map(futures::stream::iter)
320            .flat_map(move |(block_receipts, removed)| {
321                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
322                    &filter,
323                    block_receipts.block,
324                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
325                    removed,
326                );
327                futures::stream::iter(all_logs)
328            })
329    }
330}