reth_transaction_pool/
lib.rs

1//! Reth's transaction pool implementation.
2//!
3//! This crate provides a generic transaction pool implementation.
4//!
5//! ## Functionality
6//!
7//! The transaction pool is responsible for
8//!
9//!    - recording incoming transactions
10//!    - providing existing transactions
11//!    - ordering and providing the best transactions for block production
12//!    - monitoring memory footprint and enforce pool size limits
13//!    - storing blob data for transactions in a separate blobstore on insertion
14//!
15//! ## Assumptions
16//!
17//! ### Transaction type
18//!
19//! The pool expects certain ethereum related information from the generic transaction type of the
20//! pool ([`PoolTransaction`]), this includes gas price, base fee (EIP-1559 transactions), nonce
21//! etc. It makes no assumptions about the encoding format, but the transaction type must report its
22//! size so pool size limits (memory) can be enforced.
23//!
24//! ### Transaction ordering
25//!
26//! The pending pool contains transactions that can be mined on the current state.
27//! The order in which they're returned are determined by a `Priority` value returned by the
28//! `TransactionOrdering` type this pool is configured with.
29//!
30//! This is only used in the _pending_ pool to yield the best transactions for block production. The
31//! _base pool_ is ordered by base fee, and the _queued pool_ by current distance.
32//!
33//! ### Validation
34//!
35//! The pool itself does not validate incoming transactions, instead this should be provided by
36//! implementing `TransactionsValidator`. Only transactions that the validator returns as valid are
37//! included in the pool. It is assumed that transaction that are in the pool are either valid on
38//! the current state or could become valid after certain state changes. Transactions that can never
39//! become valid (e.g. nonce lower than current on chain nonce) will never be added to the pool and
40//! instead are discarded right away.
41//!
42//! ### State Changes
43//!
44//! Once a new block is mined, the pool needs to be updated with a changeset in order to:
45//!
46//!   - remove mined transactions
47//!   - update using account changes: balance changes
48//!   - base fee updates
49//!
50//! ## Implementation details
51//!
52//! The `TransactionPool` trait exposes all externally used functionality of the pool, such as
53//! inserting, querying specific transactions by hash or retrieving the best transactions.
54//! In addition, it enables the registration of event listeners that are notified of state changes.
55//! Events are communicated via channels.
56//!
57//! ### Architecture
58//!
59//! The final `TransactionPool` is made up of two layers:
60//!
61//! The lowest layer is the actual pool implementations that manages (validated) transactions:
62//! [`TxPool`](crate::pool::txpool::TxPool). This is contained in a higher level pool type that
63//! guards the low level pool and handles additional listeners or metrics: [`PoolInner`].
64//!
65//! The transaction pool will be used by separate consumers (RPC, P2P), to make sharing easier, the
66//! [`Pool`] type is just an `Arc` wrapper around `PoolInner`. This is the usable type that provides
67//! the `TransactionPool` interface.
68//!
69//!
70//! ## Blob Transactions
71//!
72//! Blob transaction can be quite large hence they are stored in a separate blobstore. The pool is
73//! responsible for inserting blob data for new transactions into the blobstore.
74//! See also [`ValidTransaction`](validate::ValidTransaction)
75//!
76//!
77//! ## Examples
78//!
79//! Listen for new transactions and print them:
80//!
81//! ```
82//! use reth_chainspec::MAINNET;
83//! use reth_storage_api::StateProviderFactory;
84//! use reth_tasks::TokioTaskExecutor;
85//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool};
86//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
87//! async fn t<C>(client: C)  where C: StateProviderFactory + Clone + 'static{
88//!     let blob_store = InMemoryBlobStore::default();
89//!     let pool = Pool::eth_pool(
90//!         TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()),
91//!         blob_store,
92//!         Default::default(),
93//!     );
94//!   let mut transactions = pool.pending_transactions_listener();
95//!   tokio::task::spawn( async move {
96//!      while let Some(tx) = transactions.recv().await {
97//!          println!("New transaction: {:?}", tx);
98//!      }
99//!   });
100//!
101//!   // do something useful with the pool, like RPC integration
102//!
103//! # }
104//! ```
105//!
106//! Spawn maintenance task to keep the pool updated
107//!
108//! ```
109//! use futures_util::Stream;
110//! use reth_chain_state::CanonStateNotification;
111//! use reth_chainspec::{MAINNET, ChainSpecProvider, ChainSpec};
112//! use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
113//! use reth_tasks::TokioTaskExecutor;
114//! use reth_tasks::TaskSpawner;
115//! use reth_tasks::TaskManager;
116//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
117//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
118//! use reth_transaction_pool::maintain::{maintain_transaction_pool_future};
119//!
120//!  async fn t<C, St>(client: C, stream: St)
121//!    where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider<ChainSpec = ChainSpec> + Clone + 'static,
122//!     St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
123//!     {
124//!     let blob_store = InMemoryBlobStore::default();
125//!     let rt = tokio::runtime::Runtime::new().unwrap();
126//!     let manager = TaskManager::new(rt.handle().clone());
127//!     let executor = manager.executor();
128//!     let pool = Pool::eth_pool(
129//!         TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), blob_store.clone(), executor.clone()),
130//!         blob_store,
131//!         Default::default(),
132//!     );
133//!
134//!   // spawn a task that listens for new blocks and updates the pool's transactions, mined transactions etc..
135//!   tokio::task::spawn(maintain_transaction_pool_future(client, pool, stream, executor.clone(), Default::default()));
136//!
137//! # }
138//! ```
139//!
140//! ## Feature Flags
141//!
142//! - `serde` (default): Enable serde support
143//! - `test-utils`: Export utilities for testing
144
145#![doc(
146    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
147    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
148    issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
149)]
150#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
151#![cfg_attr(not(test), warn(unused_crate_dependencies))]
152
153use crate::{identifier::TransactionId, pool::PoolInner};
154use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
155use alloy_primitives::{Address, TxHash, B256, U256};
156use aquamarine as _;
157use reth_eth_wire_types::HandleMempoolData;
158use reth_execution_types::ChangedAccount;
159use reth_primitives::RecoveredTx;
160use reth_storage_api::StateProviderFactory;
161use std::{collections::HashSet, sync::Arc};
162use tokio::sync::mpsc::Receiver;
163use tracing::{instrument, trace};
164
165pub use crate::{
166    blobstore::{BlobStore, BlobStoreError},
167    config::{
168        LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP,
169        DEFAULT_TXPOOL_ADDITIONAL_VALIDATION_TASKS, MAX_NEW_PENDING_TXS_NOTIFICATIONS,
170        REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
171        TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
172    },
173    error::PoolResult,
174    ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
175    pool::{
176        blob_tx_priority, fee_delta, state::SubPool, AllTransactionsEvents, FullTransactionEvent,
177        TransactionEvent, TransactionEvents,
178    },
179    traits::*,
180    validate::{
181        EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
182        TransactionValidator, ValidPoolTransaction,
183    },
184};
185
186pub mod error;
187pub mod maintain;
188pub mod metrics;
189pub mod noop;
190pub mod pool;
191pub mod validate;
192
193pub mod blobstore;
194mod config;
195pub mod identifier;
196mod ordering;
197mod traits;
198
199#[cfg(any(test, feature = "test-utils"))]
200/// Common test helpers for mocking a pool
201pub mod test_utils;
202
203/// Type alias for default ethereum transaction pool
204pub type EthTransactionPool<Client, S> = Pool<
205    TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
206    CoinbaseTipOrdering<EthPooledTransaction>,
207    S,
208>;
209
210/// A shareable, generic, customizable `TransactionPool` implementation.
211#[derive(Debug)]
212pub struct Pool<V, T: TransactionOrdering, S> {
213    /// Arc'ed instance of the pool internals
214    pool: Arc<PoolInner<V, T, S>>,
215}
216
217// === impl Pool ===
218
219impl<V, T, S> Pool<V, T, S>
220where
221    V: TransactionValidator,
222    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
223    S: BlobStore,
224{
225    /// Create a new transaction pool instance.
226    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
227        Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
228    }
229
230    /// Returns the wrapped pool.
231    pub(crate) fn inner(&self) -> &PoolInner<V, T, S> {
232        &self.pool
233    }
234
235    /// Get the config the pool was configured with.
236    pub fn config(&self) -> &PoolConfig {
237        self.inner().config()
238    }
239
240    /// Returns future that validates all transactions in the given iterator.
241    ///
242    /// This returns the validated transactions in the iterator's order.
243    async fn validate_all(
244        &self,
245        origin: TransactionOrigin,
246        transactions: impl IntoIterator<Item = V::Transaction>,
247    ) -> Vec<(TxHash, TransactionValidationOutcome<V::Transaction>)> {
248        futures_util::future::join_all(transactions.into_iter().map(|tx| self.validate(origin, tx)))
249            .await
250    }
251
252    /// Validates the given transaction
253    async fn validate(
254        &self,
255        origin: TransactionOrigin,
256        transaction: V::Transaction,
257    ) -> (TxHash, TransactionValidationOutcome<V::Transaction>) {
258        let hash = *transaction.hash();
259
260        let outcome = self.pool.validator().validate_transaction(origin, transaction).await;
261
262        (hash, outcome)
263    }
264
265    /// Number of transactions in the entire pool
266    pub fn len(&self) -> usize {
267        self.pool.len()
268    }
269
270    /// Whether the pool is empty
271    pub fn is_empty(&self) -> bool {
272        self.pool.is_empty()
273    }
274
275    /// Returns whether or not the pool is over its configured size and transaction count limits.
276    pub fn is_exceeded(&self) -> bool {
277        self.pool.is_exceeded()
278    }
279}
280
281impl<Client, S> EthTransactionPool<Client, S>
282where
283    Client: StateProviderFactory + Clone + 'static,
284    S: BlobStore,
285{
286    /// Returns a new [`Pool`] that uses the default [`TransactionValidationTaskExecutor`] when
287    /// validating [`EthPooledTransaction`]s and ords via [`CoinbaseTipOrdering`]
288    ///
289    /// # Example
290    ///
291    /// ```
292    /// use reth_chainspec::MAINNET;
293    /// use reth_storage_api::StateProviderFactory;
294    /// use reth_tasks::TokioTaskExecutor;
295    /// use reth_transaction_pool::{
296    ///     blobstore::InMemoryBlobStore, Pool, TransactionValidationTaskExecutor,
297    /// };
298    /// # fn t<C>(client: C)  where C: StateProviderFactory + Clone + 'static {
299    /// let blob_store = InMemoryBlobStore::default();
300    /// let pool = Pool::eth_pool(
301    ///     TransactionValidationTaskExecutor::eth(
302    ///         client,
303    ///         MAINNET.clone(),
304    ///         blob_store.clone(),
305    ///         TokioTaskExecutor::default(),
306    ///     ),
307    ///     blob_store,
308    ///     Default::default(),
309    /// );
310    /// # }
311    /// ```
312    pub fn eth_pool(
313        validator: TransactionValidationTaskExecutor<
314            EthTransactionValidator<Client, EthPooledTransaction>,
315        >,
316        blob_store: S,
317        config: PoolConfig,
318    ) -> Self {
319        Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
320    }
321}
322
323/// implements the `TransactionPool` interface for various transaction pool API consumers.
324impl<V, T, S> TransactionPool for Pool<V, T, S>
325where
326    V: TransactionValidator,
327    <V as TransactionValidator>::Transaction: EthPoolTransaction,
328    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
329    S: BlobStore,
330{
331    type Transaction = T::Transaction;
332
333    fn pool_size(&self) -> PoolSize {
334        self.pool.size()
335    }
336
337    fn block_info(&self) -> BlockInfo {
338        self.pool.block_info()
339    }
340
341    async fn add_transaction_and_subscribe(
342        &self,
343        origin: TransactionOrigin,
344        transaction: Self::Transaction,
345    ) -> PoolResult<TransactionEvents> {
346        let (_, tx) = self.validate(origin, transaction).await;
347        self.pool.add_transaction_and_subscribe(origin, tx)
348    }
349
350    async fn add_transaction(
351        &self,
352        origin: TransactionOrigin,
353        transaction: Self::Transaction,
354    ) -> PoolResult<TxHash> {
355        let (_, tx) = self.validate(origin, transaction).await;
356        let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
357        results.pop().expect("result length is the same as the input")
358    }
359
360    async fn add_transactions(
361        &self,
362        origin: TransactionOrigin,
363        transactions: Vec<Self::Transaction>,
364    ) -> Vec<PoolResult<TxHash>> {
365        if transactions.is_empty() {
366            return Vec::new()
367        }
368        let validated = self.validate_all(origin, transactions).await;
369
370        self.pool.add_transactions(origin, validated.into_iter().map(|(_, tx)| tx))
371    }
372
373    fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
374        self.pool.add_transaction_event_listener(tx_hash)
375    }
376
377    fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
378        self.pool.add_all_transactions_event_listener()
379    }
380
381    fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
382        self.pool.add_pending_listener(kind)
383    }
384
385    fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
386        self.pool.add_blob_sidecar_listener()
387    }
388
389    fn new_transactions_listener_for(
390        &self,
391        kind: TransactionListenerKind,
392    ) -> Receiver<NewTransactionEvent<Self::Transaction>> {
393        self.pool.add_new_transaction_listener(kind)
394    }
395
396    fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
397        self.pool.pooled_transactions_hashes()
398    }
399
400    fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
401        self.pooled_transaction_hashes().into_iter().take(max).collect()
402    }
403
404    fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
405        self.pool.pooled_transactions()
406    }
407
408    fn pooled_transactions_max(
409        &self,
410        max: usize,
411    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
412        self.pool.pooled_transactions_max(max)
413    }
414
415    fn get_pooled_transaction_elements(
416        &self,
417        tx_hashes: Vec<TxHash>,
418        limit: GetPooledTransactionLimit,
419    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
420        self.pool.get_pooled_transaction_elements(tx_hashes, limit)
421    }
422
423    fn get_pooled_transaction_element(
424        &self,
425        tx_hash: TxHash,
426    ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
427    {
428        self.pool.get_pooled_transaction_element(tx_hash)
429    }
430
431    fn best_transactions(
432        &self,
433    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
434        Box::new(self.pool.best_transactions())
435    }
436
437    fn best_transactions_with_attributes(
438        &self,
439        best_transactions_attributes: BestTransactionsAttributes,
440    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
441        self.pool.best_transactions_with_attributes(best_transactions_attributes)
442    }
443
444    fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
445        self.pool.pending_transactions()
446    }
447
448    fn pending_transactions_max(
449        &self,
450        max: usize,
451    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
452        self.pool.pending_transactions_max(max)
453    }
454
455    fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
456        self.pool.queued_transactions()
457    }
458
459    fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
460        self.pool.all_transactions()
461    }
462
463    fn remove_transactions(
464        &self,
465        hashes: Vec<TxHash>,
466    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
467        self.pool.remove_transactions(hashes)
468    }
469
470    fn remove_transactions_and_descendants(
471        &self,
472        hashes: Vec<TxHash>,
473    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
474        self.pool.remove_transactions_and_descendants(hashes)
475    }
476
477    fn remove_transactions_by_sender(
478        &self,
479        sender: Address,
480    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
481        self.pool.remove_transactions_by_sender(sender)
482    }
483
484    fn retain_unknown<A>(&self, announcement: &mut A)
485    where
486        A: HandleMempoolData,
487    {
488        self.pool.retain_unknown(announcement)
489    }
490
491    fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
492        self.inner().get(tx_hash)
493    }
494
495    fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
496        self.inner().get_all(txs)
497    }
498
499    fn on_propagated(&self, txs: PropagatedTransactions) {
500        self.inner().on_propagated(txs)
501    }
502
503    fn get_transactions_by_sender(
504        &self,
505        sender: Address,
506    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
507        self.pool.get_transactions_by_sender(sender)
508    }
509
510    fn get_pending_transactions_with_predicate(
511        &self,
512        predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
513    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
514        self.pool.pending_transactions_with_predicate(predicate)
515    }
516
517    fn get_pending_transactions_by_sender(
518        &self,
519        sender: Address,
520    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
521        self.pool.get_pending_transactions_by_sender(sender)
522    }
523
524    fn get_queued_transactions_by_sender(
525        &self,
526        sender: Address,
527    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
528        self.pool.get_queued_transactions_by_sender(sender)
529    }
530
531    fn get_highest_transaction_by_sender(
532        &self,
533        sender: Address,
534    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
535        self.pool.get_highest_transaction_by_sender(sender)
536    }
537
538    fn get_highest_consecutive_transaction_by_sender(
539        &self,
540        sender: Address,
541        on_chain_nonce: u64,
542    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
543        self.pool.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
544    }
545
546    fn get_transaction_by_sender_and_nonce(
547        &self,
548        sender: Address,
549        nonce: u64,
550    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
551        let transaction_id = TransactionId::new(self.pool.get_sender_id(sender), nonce);
552
553        self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
554    }
555
556    fn get_transactions_by_origin(
557        &self,
558        origin: TransactionOrigin,
559    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
560        self.pool.get_transactions_by_origin(origin)
561    }
562
563    /// Returns all pending transactions filtered by [`TransactionOrigin`]
564    fn get_pending_transactions_by_origin(
565        &self,
566        origin: TransactionOrigin,
567    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
568        self.pool.get_pending_transactions_by_origin(origin)
569    }
570
571    fn unique_senders(&self) -> HashSet<Address> {
572        self.pool.unique_senders()
573    }
574
575    fn get_blob(
576        &self,
577        tx_hash: TxHash,
578    ) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
579        self.pool.blob_store().get(tx_hash)
580    }
581
582    fn get_all_blobs(
583        &self,
584        tx_hashes: Vec<TxHash>,
585    ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
586        self.pool.blob_store().get_all(tx_hashes)
587    }
588
589    fn get_all_blobs_exact(
590        &self,
591        tx_hashes: Vec<TxHash>,
592    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
593        self.pool.blob_store().get_exact(tx_hashes)
594    }
595
596    fn get_blobs_for_versioned_hashes(
597        &self,
598        versioned_hashes: &[B256],
599    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
600        self.pool.blob_store().get_by_versioned_hashes(versioned_hashes)
601    }
602}
603
604impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
605where
606    V: TransactionValidator,
607    <V as TransactionValidator>::Transaction: EthPoolTransaction,
608    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
609    S: BlobStore,
610{
611    #[instrument(skip(self), target = "txpool")]
612    fn set_block_info(&self, info: BlockInfo) {
613        trace!(target: "txpool", "updating pool block info");
614        self.pool.set_block_info(info)
615    }
616
617    fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_>) {
618        self.pool.on_canonical_state_change(update);
619    }
620
621    fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
622        self.pool.update_accounts(accounts);
623    }
624
625    fn delete_blob(&self, tx: TxHash) {
626        self.pool.delete_blob(tx)
627    }
628
629    fn delete_blobs(&self, txs: Vec<TxHash>) {
630        self.pool.delete_blobs(txs)
631    }
632
633    fn cleanup_blobs(&self) {
634        self.pool.cleanup_blobs()
635    }
636}
637
638impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
639    fn clone(&self) -> Self {
640        Self { pool: Arc::clone(&self.pool) }
641    }
642}