reth_transaction_pool/validate/
task.rs1use crate::{
4 blobstore::BlobStore,
5 validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
6 EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
7 TransactionValidator,
8};
9use futures_util::{lock::Mutex, StreamExt};
10use reth_chainspec::ChainSpec;
11use reth_primitives::SealedBlock;
12use reth_tasks::TaskSpawner;
13use std::{future::Future, pin::Pin, sync::Arc};
14use tokio::{
15 sync,
16 sync::{mpsc, oneshot},
17};
18use tokio_stream::wrappers::ReceiverStream;
19
20type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
22
23type ValidationStream = ReceiverStream<ValidationFuture>;
25
26#[derive(Clone)]
32pub struct ValidationTask {
33 validation_jobs: Arc<Mutex<ValidationStream>>,
34}
35
36impl ValidationTask {
37 pub fn new() -> (ValidationJobSender, Self) {
39 let (tx, rx) = mpsc::channel(1);
40 (ValidationJobSender { tx }, Self::with_receiver(rx))
41 }
42
43 pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
45 Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
46 }
47
48 pub async fn run(self) {
52 while let Some(task) = self.validation_jobs.lock().await.next().await {
53 task.await;
54 }
55 }
56}
57
58impl std::fmt::Debug for ValidationTask {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
61 }
62}
63
64#[derive(Debug)]
66pub struct ValidationJobSender {
67 tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
68}
69
70impl ValidationJobSender {
71 pub async fn send(
73 &self,
74 job: Pin<Box<dyn Future<Output = ()> + Send>>,
75 ) -> Result<(), TransactionValidatorError> {
76 self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
77 }
78}
79
80#[derive(Debug, Clone)]
84pub struct TransactionValidationTaskExecutor<V> {
85 pub validator: V,
87 pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
89}
90
91impl TransactionValidationTaskExecutor<()> {
94 pub fn eth_builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
96 EthTransactionValidatorBuilder::new(chain_spec)
97 }
98}
99
100impl<V> TransactionValidationTaskExecutor<V> {
101 pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
103 where
104 F: FnMut(V) -> T,
105 {
106 TransactionValidationTaskExecutor {
107 validator: f(self.validator),
108 to_validation_task: self.to_validation_task,
109 }
110 }
111}
112
113impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
114 pub fn eth<T, S: BlobStore>(
119 client: Client,
120 chain_spec: Arc<ChainSpec>,
121 blob_store: S,
122 tasks: T,
123 ) -> Self
124 where
125 T: TaskSpawner,
126 {
127 Self::eth_with_additional_tasks(client, chain_spec, blob_store, tasks, 0)
128 }
129
130 pub fn eth_with_additional_tasks<T, S: BlobStore>(
140 client: Client,
141 chain_spec: Arc<ChainSpec>,
142 blob_store: S,
143 tasks: T,
144 num_additional_tasks: usize,
145 ) -> Self
146 where
147 T: TaskSpawner,
148 {
149 EthTransactionValidatorBuilder::new(chain_spec)
150 .with_additional_tasks(num_additional_tasks)
151 .build_with_tasks::<Client, Tx, T, S>(client, tasks, blob_store)
152 }
153}
154
155impl<V> TransactionValidationTaskExecutor<V> {
156 pub fn new(validator: V) -> Self {
161 let (tx, _) = ValidationTask::new();
162 Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
163 }
164}
165
166impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
167where
168 V: TransactionValidator + Clone + 'static,
169{
170 type Transaction = <V as TransactionValidator>::Transaction;
171
172 async fn validate_transaction(
173 &self,
174 origin: TransactionOrigin,
175 transaction: Self::Transaction,
176 ) -> TransactionValidationOutcome<Self::Transaction> {
177 let hash = *transaction.hash();
178 let (tx, rx) = oneshot::channel();
179 {
180 let res = {
181 let to_validation_task = self.to_validation_task.clone();
182 let to_validation_task = to_validation_task.lock().await;
183 let validator = self.validator.clone();
184 to_validation_task
185 .send(Box::pin(async move {
186 let res = validator.validate_transaction(origin, transaction).await;
187 let _ = tx.send(res);
188 }))
189 .await
190 };
191 if res.is_err() {
192 return TransactionValidationOutcome::Error(
193 hash,
194 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
195 )
196 }
197 }
198
199 match rx.await {
200 Ok(res) => res,
201 Err(_) => TransactionValidationOutcome::Error(
202 hash,
203 Box::new(TransactionValidatorError::ValidationServiceUnreachable),
204 ),
205 }
206 }
207
208 fn on_new_head_block(&self, new_tip_block: &SealedBlock) {
209 self.validator.on_new_head_block(new_tip_block)
210 }
211}