reth_transaction_pool/validate/
task.rs

1//! A validation service for transactions.
2
3use crate::{
4    blobstore::BlobStore,
5    validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
6    EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
7    TransactionValidator,
8};
9use futures_util::{lock::Mutex, StreamExt};
10use reth_chainspec::ChainSpec;
11use reth_primitives::SealedBlock;
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15    sync,
16    sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20/// Represents a future outputting unit type and is sendable.
21type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23/// Represents a stream of validation futures.
24type ValidationStream = ReceiverStream<ValidationFuture>;
25
26/// A service that performs validation jobs.
27///
28/// This listens for incoming validation jobs and executes them.
29///
30/// This should be spawned as a task: [`ValidationTask::run`]
31#[derive(Clone)]
32pub struct ValidationTask {
33    validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37    /// Creates a new clonable task pair
38    pub fn new() -> (ValidationJobSender, Self) {
39        let (tx, rx) = mpsc::channel(1);
40        (ValidationJobSender { tx }, Self::with_receiver(rx))
41    }
42
43    /// Creates a new task with the given receiver.
44    pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
45        Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
46    }
47
48    /// Executes all new validation jobs that come in.
49    ///
50    /// This will run as long as the channel is alive and is expected to be spawned as a task.
51    pub async fn run(self) {
52        while let Some(task) = self.validation_jobs.lock().await.next().await {
53            task.await;
54        }
55    }
56}
57
58impl std::fmt::Debug for ValidationTask {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
61    }
62}
63
64/// A sender new type for sending validation jobs to [`ValidationTask`].
65#[derive(Debug)]
66pub struct ValidationJobSender {
67    tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
68}
69
70impl ValidationJobSender {
71    /// Sends the given job to the validation task.
72    pub async fn send(
73        &self,
74        job: Pin<Box<dyn Future<Output = ()> + Send>>,
75    ) -> Result<(), TransactionValidatorError> {
76        self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
77    }
78}
79
80/// A [`TransactionValidator`] implementation that validates ethereum transaction.
81///
82/// This validator is non-blocking, all validation work is done in a separate task.
83#[derive(Debug, Clone)]
84pub struct TransactionValidationTaskExecutor<V> {
85    /// The validator that will validate transactions on a separate task.
86    pub validator: V,
87    /// The sender half to validation tasks that perform the actual validation.
88    pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
89}
90
91// === impl TransactionValidationTaskExecutor ===
92
93impl TransactionValidationTaskExecutor<()> {
94    /// Convenience method to create a [`EthTransactionValidatorBuilder`]
95    pub fn eth_builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
96        EthTransactionValidatorBuilder::new(chain_spec)
97    }
98}
99
100impl<V> TransactionValidationTaskExecutor<V> {
101    /// Maps the given validator to a new type.
102    pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
103    where
104        F: FnMut(V) -> T,
105    {
106        TransactionValidationTaskExecutor {
107            validator: f(self.validator),
108            to_validation_task: self.to_validation_task,
109        }
110    }
111}
112
113impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
114    /// Creates a new instance for the given [`ChainSpec`]
115    ///
116    /// This will spawn a single validation tasks that performs the actual validation.
117    /// See [`TransactionValidationTaskExecutor::eth_with_additional_tasks`]
118    pub fn eth<T, S: BlobStore>(
119        client: Client,
120        chain_spec: Arc<ChainSpec>,
121        blob_store: S,
122        tasks: T,
123    ) -> Self
124    where
125        T: TaskSpawner,
126    {
127        Self::eth_with_additional_tasks(client, chain_spec, blob_store, tasks, 0)
128    }
129
130    /// Creates a new instance for the given [`ChainSpec`]
131    ///
132    /// By default this will enable support for:
133    ///   - shanghai
134    ///   - eip1559
135    ///   - eip2930
136    ///
137    /// This will always spawn a validation task that performs the actual validation. It will spawn
138    /// `num_additional_tasks` additional tasks.
139    pub fn eth_with_additional_tasks<T, S: BlobStore>(
140        client: Client,
141        chain_spec: Arc<ChainSpec>,
142        blob_store: S,
143        tasks: T,
144        num_additional_tasks: usize,
145    ) -> Self
146    where
147        T: TaskSpawner,
148    {
149        EthTransactionValidatorBuilder::new(chain_spec)
150            .with_additional_tasks(num_additional_tasks)
151            .build_with_tasks::<Client, Tx, T, S>(client, tasks, blob_store)
152    }
153}
154
155impl<V> TransactionValidationTaskExecutor<V> {
156    /// Creates a new executor instance with the given validator for transaction validation.
157    ///
158    /// Initializes the executor with the provided validator and sets up communication for
159    /// validation tasks.
160    pub fn new(validator: V) -> Self {
161        let (tx, _) = ValidationTask::new();
162        Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
163    }
164}
165
166impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
167where
168    V: TransactionValidator + Clone + 'static,
169{
170    type Transaction = <V as TransactionValidator>::Transaction;
171
172    async fn validate_transaction(
173        &self,
174        origin: TransactionOrigin,
175        transaction: Self::Transaction,
176    ) -> TransactionValidationOutcome<Self::Transaction> {
177        let hash = *transaction.hash();
178        let (tx, rx) = oneshot::channel();
179        {
180            let res = {
181                let to_validation_task = self.to_validation_task.clone();
182                let to_validation_task = to_validation_task.lock().await;
183                let validator = self.validator.clone();
184                to_validation_task
185                    .send(Box::pin(async move {
186                        let res = validator.validate_transaction(origin, transaction).await;
187                        let _ = tx.send(res);
188                    }))
189                    .await
190            };
191            if res.is_err() {
192                return TransactionValidationOutcome::Error(
193                    hash,
194                    Box::new(TransactionValidatorError::ValidationServiceUnreachable),
195                )
196            }
197        }
198
199        match rx.await {
200            Ok(res) => res,
201            Err(_) => TransactionValidationOutcome::Error(
202                hash,
203                Box::new(TransactionValidatorError::ValidationServiceUnreachable),
204            ),
205        }
206    }
207
208    fn on_new_head_block(&self, new_tip_block: &SealedBlock) {
209        self.validator.on_new_head_block(new_tip_block)
210    }
211}