reth_rpc/eth/
pubsub.rs

1//! `eth_` `PubSub` RPC handler implementation
2
3use std::sync::Arc;
4
5use alloy_primitives::TxHash;
6use alloy_rpc_types_eth::{
7    pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
8    Filter, Header, Log,
9};
10use futures::StreamExt;
11use jsonrpsee::{
12    server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink,
13};
14use reth_chain_state::CanonStateSubscriptions;
15use reth_network_api::NetworkInfo;
16use reth_primitives_traits::NodePrimitives;
17use reth_rpc_eth_api::{
18    pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
19};
20use reth_rpc_eth_types::logs_utils;
21use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
22use reth_storage_api::BlockNumReader;
23use reth_tasks::{TaskSpawner, TokioTaskExecutor};
24use reth_transaction_pool::{NewTransactionEvent, PoolConsensusTx, TransactionPool};
25use serde::Serialize;
26use tokio_stream::{
27    wrappers::{BroadcastStream, ReceiverStream},
28    Stream,
29};
30use tracing::error;
31
32/// `Eth` pubsub RPC implementation.
33///
34/// This handles `eth_subscribe` RPC calls.
35#[derive(Clone)]
36pub struct EthPubSub<Eth> {
37    /// All nested fields bundled together.
38    inner: Arc<EthPubSubInner<Eth>>,
39}
40
41// === impl EthPubSub ===
42
43impl<Eth> EthPubSub<Eth> {
44    /// Creates a new, shareable instance.
45    ///
46    /// Subscription tasks are spawned via [`tokio::task::spawn`]
47    pub fn new(eth_api: Eth) -> Self {
48        Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
49    }
50
51    /// Creates a new, shareable instance.
52    pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
53        let inner = EthPubSubInner { eth_api, subscription_task_spawner };
54        Self { inner: Arc::new(inner) }
55    }
56}
57
58#[async_trait::async_trait]
59impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
60where
61    Eth: RpcNodeCore<
62            Provider: BlockNumReader + CanonStateSubscriptions,
63            Pool: TransactionPool,
64            Network: NetworkInfo,
65        > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
66        + 'static,
67{
68    /// Handler for `eth_subscribe`
69    async fn subscribe(
70        &self,
71        pending: PendingSubscriptionSink,
72        kind: SubscriptionKind,
73        params: Option<Params>,
74    ) -> jsonrpsee::core::SubscriptionResult {
75        let sink = pending.accept().await?;
76        let pubsub = self.inner.clone();
77        self.inner.subscription_task_spawner.spawn(Box::pin(async move {
78            let _ = handle_accepted(pubsub, sink, kind, params).await;
79        }));
80
81        Ok(())
82    }
83}
84
85/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
86async fn handle_accepted<Eth>(
87    pubsub: Arc<EthPubSubInner<Eth>>,
88    accepted_sink: SubscriptionSink,
89    kind: SubscriptionKind,
90    params: Option<Params>,
91) -> Result<(), ErrorObject<'static>>
92where
93    Eth: RpcNodeCore<
94            Provider: BlockNumReader + CanonStateSubscriptions,
95            Pool: TransactionPool,
96            Network: NetworkInfo,
97        > + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
98{
99    match kind {
100        SubscriptionKind::NewHeads => {
101            pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
102        }
103        SubscriptionKind::Logs => {
104            // if no params are provided, used default filter params
105            let filter = match params {
106                Some(Params::Logs(filter)) => *filter,
107                Some(Params::Bool(_)) => {
108                    return Err(invalid_params_rpc_err("Invalid params for logs"))
109                }
110                _ => Default::default(),
111            };
112            pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
113        }
114        SubscriptionKind::NewPendingTransactions => {
115            if let Some(params) = params {
116                match params {
117                    Params::Bool(true) => {
118                        // full transaction objects requested
119                        let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
120                            let tx_value = match pubsub
121                                .eth_api
122                                .tx_resp_builder()
123                                .fill_pending(tx.transaction.to_consensus())
124                            {
125                                Ok(tx) => Some(tx),
126                                Err(err) => {
127                                    error!(target = "rpc",
128                                        %err,
129                                        "Failed to fill transaction with block context"
130                                    );
131                                    None
132                                }
133                            };
134                            std::future::ready(tx_value)
135                        });
136                        return pipe_from_stream(accepted_sink, stream).await
137                    }
138                    Params::Bool(false) | Params::None => {
139                        // only hashes requested
140                    }
141                    Params::Logs(_) => {
142                        return Err(invalid_params_rpc_err(
143                            "Invalid params for newPendingTransactions",
144                        ))
145                    }
146                }
147            }
148
149            pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
150        }
151        SubscriptionKind::Syncing => {
152            // get new block subscription
153            let mut canon_state =
154                BroadcastStream::new(pubsub.eth_api.provider().subscribe_to_canonical_state());
155            // get current sync status
156            let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
157            let current_sub_res = pubsub.sync_status(initial_sync_status);
158
159            // send the current status immediately
160            let msg = SubscriptionMessage::new(
161                accepted_sink.method_name(),
162                accepted_sink.subscription_id(),
163                &current_sub_res,
164            )
165            .map_err(SubscriptionSerializeError::new)?;
166
167            if accepted_sink.send(msg).await.is_err() {
168                return Ok(())
169            }
170
171            while canon_state.next().await.is_some() {
172                let current_syncing = pubsub.eth_api.network().is_syncing();
173                // Only send a new response if the sync status has changed
174                if current_syncing != initial_sync_status {
175                    // Update the sync status on each new block
176                    initial_sync_status = current_syncing;
177
178                    // send a new message now that the status changed
179                    let sync_status = pubsub.sync_status(current_syncing);
180                    let msg = SubscriptionMessage::new(
181                        accepted_sink.method_name(),
182                        accepted_sink.subscription_id(),
183                        &sync_status,
184                    )
185                    .map_err(SubscriptionSerializeError::new)?;
186
187                    if accepted_sink.send(msg).await.is_err() {
188                        break
189                    }
190                }
191            }
192
193            Ok(())
194        }
195    }
196}
197
198/// Helper to convert a serde error into an [`ErrorObject`]
199#[derive(Debug, thiserror::Error)]
200#[error("Failed to serialize subscription item: {0}")]
201pub struct SubscriptionSerializeError(#[from] serde_json::Error);
202
203impl SubscriptionSerializeError {
204    const fn new(err: serde_json::Error) -> Self {
205        Self(err)
206    }
207}
208
209impl From<SubscriptionSerializeError> for ErrorObject<'static> {
210    fn from(value: SubscriptionSerializeError) -> Self {
211        internal_rpc_err(value.to_string())
212    }
213}
214
215/// Pipes all stream items to the subscription sink.
216async fn pipe_from_stream<T, St>(
217    sink: SubscriptionSink,
218    mut stream: St,
219) -> Result<(), ErrorObject<'static>>
220where
221    St: Stream<Item = T> + Unpin,
222    T: Serialize,
223{
224    loop {
225        tokio::select! {
226            _ = sink.closed() => {
227                // connection dropped
228                break Ok(())
229            },
230            maybe_item = stream.next() => {
231                let item = match maybe_item {
232                    Some(item) => item,
233                    None => {
234                        // stream ended
235                        break  Ok(())
236                    },
237                };
238                let msg = SubscriptionMessage::new(
239                    sink.method_name(),
240                    sink.subscription_id(),
241                    &item
242                ).map_err(SubscriptionSerializeError::new)?;
243
244                if sink.send(msg).await.is_err() {
245                    break Ok(());
246                }
247            }
248        }
249    }
250}
251
252impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        f.debug_struct("EthPubSub").finish_non_exhaustive()
255    }
256}
257
258/// Container type `EthPubSub`
259#[derive(Clone)]
260struct EthPubSubInner<EthApi> {
261    /// The `eth` API.
262    eth_api: EthApi,
263    /// The type that's used to spawn subscription tasks.
264    subscription_task_spawner: Box<dyn TaskSpawner>,
265}
266
267// == impl EthPubSubInner ===
268
269impl<Eth> EthPubSubInner<Eth>
270where
271    Eth: RpcNodeCore<Provider: BlockNumReader>,
272{
273    /// Returns the current sync status for the `syncing` subscription
274    fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
275        if is_syncing {
276            let current_block = self
277                .eth_api
278                .provider()
279                .chain_info()
280                .map(|info| info.best_number)
281                .unwrap_or_default();
282            PubSubSyncStatus::Detailed(SyncStatusMetadata {
283                syncing: true,
284                starting_block: 0,
285                current_block,
286                highest_block: Some(current_block),
287            })
288        } else {
289            PubSubSyncStatus::Simple(false)
290        }
291    }
292}
293
294impl<Eth> EthPubSubInner<Eth>
295where
296    Eth: RpcNodeCore<Pool: TransactionPool>,
297{
298    /// Returns a stream that yields all transaction hashes emitted by the txpool.
299    fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
300        ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
301    }
302
303    /// Returns a stream that yields all transactions emitted by the txpool.
304    fn full_pending_transaction_stream(
305        &self,
306    ) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
307        self.eth_api.pool().new_pending_pool_transactions_listener()
308    }
309}
310
311impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
312where
313    Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
314{
315    /// Returns a stream that yields all new RPC blocks.
316    fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
317        self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
318            let headers = new_chain.committed().headers().collect::<Vec<_>>();
319            futures::stream::iter(
320                headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
321            )
322        })
323    }
324
325    /// Returns a stream that yields all logs that match the given filter.
326    fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
327        BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
328            .map(move |canon_state| {
329                canon_state.expect("new block subscription never ends").block_receipts()
330            })
331            .flat_map(futures::stream::iter)
332            .flat_map(move |(block_receipts, removed)| {
333                let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
334                    &filter,
335                    block_receipts.block,
336                    block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
337                    removed,
338                );
339                futures::stream::iter(all_logs)
340            })
341    }
342}