reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    BlockInfo, PoolTransaction, PoolUpdateKind,
9};
10use alloy_consensus::BlockHeader;
11use alloy_eips::BlockNumberOrTag;
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::Encodable;
14use futures_util::{
15    future::{BoxFuture, Fuse, FusedFuture},
16    FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives::{
23    transaction::SignedTransactionIntoRecoveredExt, SealedHeader, TransactionSigned,
24};
25use reth_primitives_traits::SignedTransaction;
26use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
27use reth_tasks::TaskSpawner;
28use std::{
29    borrow::Borrow,
30    collections::HashSet,
31    hash::{Hash, Hasher},
32    path::{Path, PathBuf},
33    sync::Arc,
34};
35use tokio::sync::oneshot;
36use tracing::{debug, error, info, trace, warn};
37
38/// Additional settings for maintaining the transaction pool
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct MaintainPoolConfig {
41    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
42    /// last_seen.number`
43    ///
44    /// Default: 64 (2 epochs)
45    pub max_update_depth: u64,
46    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
47    ///
48    /// Default: 100
49    pub max_reload_accounts: usize,
50}
51
52impl Default for MaintainPoolConfig {
53    fn default() -> Self {
54        Self { max_update_depth: 64, max_reload_accounts: 100 }
55    }
56}
57
58/// Settings for local transaction backup task
59#[derive(Debug, Clone, Default)]
60pub struct LocalTransactionBackupConfig {
61    /// Path to transactions backup file
62    pub transactions_path: Option<PathBuf>,
63}
64
65impl LocalTransactionBackupConfig {
66    /// Receive path to transactions backup and return initialized config
67    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
68        Self { transactions_path: Some(transactions_path) }
69    }
70}
71
72/// Returns a spawnable future for maintaining the state of the transaction pool.
73pub fn maintain_transaction_pool_future<Client, P, St, Tasks>(
74    client: Client,
75    pool: P,
76    events: St,
77    task_spawner: Tasks,
78    config: MaintainPoolConfig,
79) -> BoxFuture<'static, ()>
80where
81    Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + Send + 'static,
82    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = TransactionSigned>> + 'static,
83    St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
84    Tasks: TaskSpawner + 'static,
85{
86    async move {
87        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
88    }
89    .boxed()
90}
91
92/// Maintains the state of the transaction pool by handling new blocks and reorgs.
93///
94/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
95pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
96    client: Client,
97    pool: P,
98    mut events: St,
99    task_spawner: Tasks,
100    config: MaintainPoolConfig,
101) where
102    Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + Send + 'static,
103    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = TransactionSigned>> + 'static,
104    St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
105    Tasks: TaskSpawner + 'static,
106{
107    let metrics = MaintainPoolMetrics::default();
108    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
109    // ensure the pool points to latest state
110    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
111        let latest = SealedHeader::seal(latest);
112        let chain_spec = client.chain_spec();
113        let info = BlockInfo {
114            block_gas_limit: latest.gas_limit(),
115            last_seen_block_hash: latest.hash(),
116            last_seen_block_number: latest.number(),
117            pending_basefee: latest
118                .next_block_base_fee(
119                    chain_spec.base_fee_params_at_timestamp(latest.timestamp() + 12),
120                )
121                .unwrap_or_default(),
122            pending_blob_fee: latest.next_block_blob_fee(),
123        };
124        pool.set_block_info(info);
125    }
126
127    // keeps track of mined blob transaction so we can clean finalized transactions
128    let mut blob_store_tracker = BlobStoreCanonTracker::default();
129
130    // keeps track of the latest finalized block
131    let mut last_finalized_block =
132        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
133
134    // keeps track of any dirty accounts that we know of are out of sync with the pool
135    let mut dirty_addresses = HashSet::default();
136
137    // keeps track of the state of the pool wrt to blocks
138    let mut maintained_state = MaintainedPoolState::InSync;
139
140    // the future that reloads accounts from state
141    let mut reload_accounts_fut = Fuse::terminated();
142
143    // The update loop that waits for new blocks and reorgs and performs pool updated
144    // Listen for new chain events and derive the update action for the pool
145    loop {
146        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
147
148        metrics.set_dirty_accounts_len(dirty_addresses.len());
149        let pool_info = pool.block_info();
150
151        // after performing a pool update after a new block we have some time to properly update
152        // dirty accounts and correct if the pool drifted from current state, for example after
153        // restart or a pipeline run
154        if maintained_state.is_drifted() {
155            metrics.inc_drift();
156            // assuming all senders are dirty
157            dirty_addresses = pool.unique_senders();
158            // make sure we toggle the state back to in sync
159            maintained_state = MaintainedPoolState::InSync;
160        }
161
162        // if we have accounts that are out of sync with the pool, we reload them in chunks
163        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
164            let (tx, rx) = oneshot::channel();
165            let c = client.clone();
166            let at = pool_info.last_seen_block_hash;
167            let fut = if dirty_addresses.len() > max_reload_accounts {
168                // need to chunk accounts to reload
169                let accs_to_reload =
170                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
171                for acc in &accs_to_reload {
172                    // make sure we remove them from the dirty set
173                    dirty_addresses.remove(acc);
174                }
175                async move {
176                    let res = load_accounts(c, at, accs_to_reload.into_iter());
177                    let _ = tx.send(res);
178                }
179                .boxed()
180            } else {
181                // can fetch all dirty accounts at once
182                let accs_to_reload = std::mem::take(&mut dirty_addresses);
183                async move {
184                    let res = load_accounts(c, at, accs_to_reload.into_iter());
185                    let _ = tx.send(res);
186                }
187                .boxed()
188            };
189            reload_accounts_fut = rx.fuse();
190            task_spawner.spawn_blocking(fut);
191        }
192
193        // check if we have a new finalized block
194        if let Some(finalized) =
195            last_finalized_block.update(client.finalized_block_number().ok().flatten())
196        {
197            if let BlobStoreUpdates::Finalized(blobs) =
198                blob_store_tracker.on_finalized_block(finalized)
199            {
200                metrics.inc_deleted_tracked_blobs(blobs.len());
201                // remove all finalized blobs from the blob store
202                pool.delete_blobs(blobs);
203                // and also do periodic cleanup
204                let pool = pool.clone();
205                task_spawner.spawn_blocking(Box::pin(async move {
206                    debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
207                    pool.cleanup_blobs();
208                }));
209            }
210        }
211
212        // outcomes of the futures we are waiting on
213        let mut event = None;
214        let mut reloaded = None;
215
216        // select of account reloads and new canonical state updates which should arrive at the rate
217        // of the block time (12s)
218        tokio::select! {
219            res = &mut reload_accounts_fut =>  {
220                reloaded = Some(res);
221            }
222            ev = events.next() =>  {
223                 if ev.is_none() {
224                    // the stream ended, we are done
225                    break;
226                }
227                event = ev;
228            }
229        }
230
231        // handle the result of the account reload
232        match reloaded {
233            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
234                // reloaded accounts successfully
235                // extend accounts we failed to load from database
236                dirty_addresses.extend(failed_to_load);
237                // update the pool with the loaded accounts
238                pool.update_accounts(accounts);
239            }
240            Some(Ok(Err(res))) => {
241                // Failed to load accounts from state
242                let (accs, err) = *res;
243                debug!(target: "txpool", %err, "failed to load accounts");
244                dirty_addresses.extend(accs);
245            }
246            Some(Err(_)) => {
247                // failed to receive the accounts, sender dropped, only possible if task panicked
248                maintained_state = MaintainedPoolState::Drifted;
249            }
250            None => {}
251        }
252
253        // handle the new block or reorg
254        let Some(event) = event else { continue };
255        match event {
256            CanonStateNotification::Reorg { old, new } => {
257                let (old_blocks, old_state) = old.inner();
258                let (new_blocks, new_state) = new.inner();
259                let new_tip = new_blocks.tip();
260                let new_first = new_blocks.first();
261                let old_first = old_blocks.first();
262
263                // check if the reorg is not canonical with the pool's block
264                if !(old_first.parent_hash == pool_info.last_seen_block_hash ||
265                    new_first.parent_hash == pool_info.last_seen_block_hash)
266                {
267                    // the new block points to a higher block than the oldest block in the old chain
268                    maintained_state = MaintainedPoolState::Drifted;
269                }
270
271                let chain_spec = client.chain_spec();
272
273                // fees for the next block: `new_tip+1`
274                let pending_block_base_fee = new_tip
275                    .next_block_base_fee(
276                        chain_spec.base_fee_params_at_timestamp(new_tip.timestamp + 12),
277                    )
278                    .unwrap_or_default();
279                let pending_block_blob_fee = new_tip.next_block_blob_fee();
280
281                // we know all changed account in the new chain
282                let new_changed_accounts: HashSet<_> =
283                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
284
285                // find all accounts that were changed in the old chain but _not_ in the new chain
286                let missing_changed_acc = old_state
287                    .accounts_iter()
288                    .map(|(a, _)| a)
289                    .filter(|addr| !new_changed_accounts.contains(addr));
290
291                // for these we need to fetch the nonce+balance from the db at the new tip
292                let mut changed_accounts =
293                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
294                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
295                            // extend accounts we failed to load from database
296                            dirty_addresses.extend(failed_to_load);
297
298                            accounts
299                        }
300                        Err(err) => {
301                            let (addresses, err) = *err;
302                            debug!(
303                                target: "txpool",
304                                %err,
305                                "failed to load missing changed accounts at new tip: {:?}",
306                                new_tip.hash()
307                            );
308                            dirty_addresses.extend(addresses);
309                            vec![]
310                        }
311                    };
312
313                // also include all accounts from new chain
314                // we can use extend here because they are unique
315                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
316
317                // all transactions mined in the new chain
318                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
319
320                // update the pool then re-inject the pruned transactions
321                // find all transactions that were mined in the old chain but not in the new chain
322                let pruned_old_transactions = old_blocks
323                    .transactions_ecrecovered()
324                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
325                    .filter_map(|tx| {
326                        if tx.is_eip4844() {
327                            // reorged blobs no longer include the blob, which is necessary for
328                            // validating the transaction. Even though the transaction could have
329                            // been validated previously, we still need the blob in order to
330                            // accurately set the transaction's
331                            // encoded-length which is propagated over the network.
332                            pool.get_blob(TransactionSigned::hash(&tx))
333                                .ok()
334                                .flatten()
335                                .map(Arc::unwrap_or_clone)
336                                .and_then(|sidecar| {
337                                    <P as TransactionPool>::Transaction::try_from_eip4844(
338                                        tx, sidecar,
339                                    )
340                                })
341                        } else {
342                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
343                        }
344                    })
345                    .collect::<Vec<_>>();
346
347                // update the pool first
348                let update = CanonicalStateUpdate {
349                    new_tip: &new_tip.block,
350                    pending_block_base_fee,
351                    pending_block_blob_fee,
352                    changed_accounts,
353                    // all transactions mined in the new chain need to be removed from the pool
354                    mined_transactions: new_blocks.transaction_hashes().collect(),
355                    update_kind: PoolUpdateKind::Reorg,
356                };
357                pool.on_canonical_state_change(update);
358
359                // all transactions that were mined in the old chain but not in the new chain need
360                // to be re-injected
361                //
362                // Note: we no longer know if the tx was local or external
363                // Because the transactions are not finalized, the corresponding blobs are still in
364                // blob store (if we previously received them from the network)
365                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
366                let _ = pool.add_external_transactions(pruned_old_transactions).await;
367
368                // keep track of new mined blob transactions
369                blob_store_tracker.add_new_chain_blocks(&new_blocks);
370            }
371            CanonStateNotification::Commit { new } => {
372                let (blocks, state) = new.inner();
373                let tip = blocks.tip();
374                let chain_spec = client.chain_spec();
375
376                // fees for the next block: `tip+1`
377                let pending_block_base_fee = tip
378                    .next_block_base_fee(
379                        chain_spec.base_fee_params_at_timestamp(tip.timestamp + 12),
380                    )
381                    .unwrap_or_default();
382                let pending_block_blob_fee = tip.next_block_blob_fee();
383
384                let first_block = blocks.first();
385                trace!(
386                    target: "txpool",
387                    first = first_block.number,
388                    tip = tip.number,
389                    pool_block = pool_info.last_seen_block_number,
390                    "update pool on new commit"
391                );
392
393                // check if the depth is too large and should be skipped, this could happen after
394                // initial sync or long re-sync
395                let depth = tip.number.abs_diff(pool_info.last_seen_block_number);
396                if depth > max_update_depth {
397                    maintained_state = MaintainedPoolState::Drifted;
398                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
399                    let info = BlockInfo {
400                        block_gas_limit: tip.gas_limit,
401                        last_seen_block_hash: tip.hash(),
402                        last_seen_block_number: tip.number,
403                        pending_basefee: pending_block_base_fee,
404                        pending_blob_fee: pending_block_blob_fee,
405                    };
406                    pool.set_block_info(info);
407
408                    // keep track of mined blob transactions
409                    blob_store_tracker.add_new_chain_blocks(&blocks);
410
411                    continue
412                }
413
414                let mut changed_accounts = Vec::with_capacity(state.state().len());
415                for acc in state.changed_accounts() {
416                    // we can always clear the dirty flag for this account
417                    dirty_addresses.remove(&acc.address);
418                    changed_accounts.push(acc);
419                }
420
421                let mined_transactions = blocks.transaction_hashes().collect();
422
423                // check if the range of the commit is canonical with the pool's block
424                if first_block.parent_hash != pool_info.last_seen_block_hash {
425                    // we received a new canonical chain commit but the commit is not canonical with
426                    // the pool's block, this could happen after initial sync or
427                    // long re-sync
428                    maintained_state = MaintainedPoolState::Drifted;
429                }
430
431                // Canonical update
432                let update = CanonicalStateUpdate {
433                    new_tip: &tip.block,
434                    pending_block_base_fee,
435                    pending_block_blob_fee,
436                    changed_accounts,
437                    mined_transactions,
438                    update_kind: PoolUpdateKind::Commit,
439                };
440                pool.on_canonical_state_change(update);
441
442                // keep track of mined blob transactions
443                blob_store_tracker.add_new_chain_blocks(&blocks);
444            }
445        }
446    }
447}
448
449struct FinalizedBlockTracker {
450    last_finalized_block: Option<BlockNumber>,
451}
452
453impl FinalizedBlockTracker {
454    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
455        Self { last_finalized_block }
456    }
457
458    /// Updates the tracked finalized block and returns the new finalized block if it changed
459    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
460        let finalized = finalized_block?;
461        self.last_finalized_block
462            .replace(finalized)
463            .is_none_or(|last| last < finalized)
464            .then_some(finalized)
465    }
466}
467
468/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
469/// state.
470#[derive(Debug, PartialEq, Eq)]
471enum MaintainedPoolState {
472    /// Pool is assumed to be in sync with the current state
473    InSync,
474    /// Pool could be out of sync with the state
475    Drifted,
476}
477
478impl MaintainedPoolState {
479    /// Returns `true` if the pool is assumed to be out of sync with the current state.
480    #[inline]
481    const fn is_drifted(&self) -> bool {
482        matches!(self, Self::Drifted)
483    }
484}
485
486/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
487#[derive(Eq)]
488struct ChangedAccountEntry(ChangedAccount);
489
490impl PartialEq for ChangedAccountEntry {
491    fn eq(&self, other: &Self) -> bool {
492        self.0.address == other.0.address
493    }
494}
495
496impl Hash for ChangedAccountEntry {
497    fn hash<H: Hasher>(&self, state: &mut H) {
498        self.0.address.hash(state);
499    }
500}
501
502impl Borrow<Address> for ChangedAccountEntry {
503    fn borrow(&self) -> &Address {
504        &self.0.address
505    }
506}
507
508#[derive(Default)]
509struct LoadedAccounts {
510    /// All accounts that were loaded
511    accounts: Vec<ChangedAccount>,
512    /// All accounts that failed to load
513    failed_to_load: Vec<Address>,
514}
515
516/// Loads all accounts at the given state
517///
518/// Returns an error with all given addresses if the state is not available.
519///
520/// Note: this expects _unique_ addresses
521fn load_accounts<Client, I>(
522    client: Client,
523    at: BlockHash,
524    addresses: I,
525) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
526where
527    I: IntoIterator<Item = Address>,
528
529    Client: StateProviderFactory,
530{
531    let addresses = addresses.into_iter();
532    let mut res = LoadedAccounts::default();
533    let state = match client.history_by_block_hash(at) {
534        Ok(state) => state,
535        Err(err) => return Err(Box::new((addresses.collect(), err))),
536    };
537    for addr in addresses {
538        if let Ok(maybe_acc) = state.basic_account(addr) {
539            let acc = maybe_acc
540                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
541                .unwrap_or_else(|| ChangedAccount::empty(addr));
542            res.accounts.push(acc)
543        } else {
544            // failed to load account.
545            res.failed_to_load.push(addr);
546        }
547    }
548    Ok(res)
549}
550
551/// Loads transactions from a file, decodes them from the RLP format, and inserts them
552/// into the transaction pool on node boot up.
553/// The file is removed after the transactions have been successfully processed.
554async fn load_and_reinsert_transactions<P>(
555    pool: P,
556    file_path: &Path,
557) -> Result<(), TransactionsBackupError>
558where
559    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
560{
561    if !file_path.exists() {
562        return Ok(())
563    }
564
565    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
566    let data = reth_fs_util::read(file_path)?;
567
568    if data.is_empty() {
569        return Ok(())
570    }
571
572    let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
573        alloy_rlp::Decodable::decode(&mut data.as_slice())?;
574
575    let pool_transactions = txs_signed
576        .into_iter()
577        .filter_map(|tx| tx.try_ecrecovered())
578        .filter_map(|tx| {
579            // Filter out errors
580            <P::Transaction as PoolTransaction>::try_from_consensus(tx).ok()
581        })
582        .collect();
583
584    let outcome = pool.add_transactions(crate::TransactionOrigin::Local, pool_transactions).await;
585
586    info!(target: "txpool", txs_file =?file_path, num_txs=%outcome.len(), "Successfully reinserted local transactions from file");
587    reth_fs_util::remove_file(file_path)?;
588    Ok(())
589}
590
591fn save_local_txs_backup<P>(pool: P, file_path: &Path)
592where
593    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
594{
595    let local_transactions = pool.get_local_transactions();
596    if local_transactions.is_empty() {
597        trace!(target: "txpool", "no local transactions to save");
598        return
599    }
600
601    let local_transactions = local_transactions
602        .into_iter()
603        .map(|tx| tx.transaction.clone_into_consensus().into_signed())
604        .collect::<Vec<_>>();
605
606    let num_txs = local_transactions.len();
607    let mut buf = Vec::new();
608    alloy_rlp::encode_list(&local_transactions, &mut buf);
609    info!(target: "txpool", txs_file =?file_path, num_txs=%num_txs, "Saving current local transactions");
610    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
611
612    match parent_dir.map(|_| reth_fs_util::write(file_path, buf)) {
613        Ok(_) => {
614            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
615        }
616        Err(err) => {
617            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
618        }
619    }
620}
621
622/// Errors possible during txs backup load and decode
623#[derive(thiserror::Error, Debug)]
624pub enum TransactionsBackupError {
625    /// Error during RLP decoding of transactions
626    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
627    Decode(#[from] alloy_rlp::Error),
628    /// Error during file upload
629    #[error("failed to apply transactions backup. Encountered file error: {0}")]
630    FsPath(#[from] FsPathError),
631    /// Error adding transactions to the transaction pool
632    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
633    Pool(#[from] PoolError),
634}
635
636/// Task which manages saving local transactions to the persistent file in case of shutdown.
637/// Reloads the transactions from the file on the boot up and inserts them into the pool.
638pub async fn backup_local_transactions_task<P>(
639    shutdown: reth_tasks::shutdown::GracefulShutdown,
640    pool: P,
641    config: LocalTransactionBackupConfig,
642) where
643    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
644{
645    let Some(transactions_path) = config.transactions_path else {
646        // nothing to do
647        return
648    };
649
650    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
651        error!(target: "txpool", "{}", err)
652    }
653
654    let graceful_guard = shutdown.await;
655
656    // write transactions to disk
657    save_local_txs_backup(pool, &transactions_path);
658
659    drop(graceful_guard)
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665    use crate::{
666        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
667        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
668    };
669    use alloy_eips::eip2718::Decodable2718;
670    use alloy_primitives::{hex, U256};
671    use reth_chainspec::MAINNET;
672    use reth_fs_util as fs;
673    use reth_primitives::PooledTransactionsElement;
674    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
675    use reth_tasks::TaskManager;
676
677    #[test]
678    fn changed_acc_entry() {
679        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
680        let mut copy = changed_acc.0;
681        copy.nonce = 10;
682        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
683    }
684
685    const EXTENSION: &str = "rlp";
686    const FILENAME: &str = "test_transactions_backup";
687
688    #[tokio::test(flavor = "multi_thread")]
689    async fn test_save_local_txs_backup() {
690        let temp_dir = tempfile::tempdir().unwrap();
691        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
692        let tx_bytes = hex!("02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507");
693        let tx = PooledTransactionsElement::decode_2718(&mut &tx_bytes[..]).unwrap();
694        let provider = MockEthProvider::default();
695        let transaction: EthPooledTransaction = tx.try_into_ecrecovered().unwrap().into();
696        let tx_to_cmp = transaction.clone();
697        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
698        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
699        let blob_store = InMemoryBlobStore::default();
700        let validator = EthTransactionValidatorBuilder::new(MAINNET.clone())
701            .build(provider, blob_store.clone());
702
703        let txpool = Pool::new(
704            validator.clone(),
705            CoinbaseTipOrdering::default(),
706            blob_store.clone(),
707            Default::default(),
708        );
709
710        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
711
712        let handle = tokio::runtime::Handle::current();
713        let manager = TaskManager::new(handle);
714        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
715        manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
716            backup_local_transactions_task(shutdown, txpool.clone(), config)
717        });
718
719        let mut txns = txpool.get_local_transactions();
720        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
721
722        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
723
724        // shutdown the executor
725        manager.graceful_shutdown();
726
727        let data = fs::read(transactions_path).unwrap();
728
729        let txs: Vec<TransactionSigned> =
730            alloy_rlp::Decodable::decode(&mut data.as_slice()).unwrap();
731        assert_eq!(txs.len(), 1);
732
733        temp_dir.close().unwrap();
734    }
735
736    #[test]
737    fn test_update_with_higher_finalized_block() {
738        let mut tracker = FinalizedBlockTracker::new(Some(10));
739        assert_eq!(tracker.update(Some(15)), Some(15));
740        assert_eq!(tracker.last_finalized_block, Some(15));
741    }
742
743    #[test]
744    fn test_update_with_lower_finalized_block() {
745        let mut tracker = FinalizedBlockTracker::new(Some(20));
746        assert_eq!(tracker.update(Some(15)), None);
747        assert_eq!(tracker.last_finalized_block, Some(15));
748    }
749
750    #[test]
751    fn test_update_with_equal_finalized_block() {
752        let mut tracker = FinalizedBlockTracker::new(Some(20));
753        assert_eq!(tracker.update(Some(20)), None);
754        assert_eq!(tracker.last_finalized_block, Some(20));
755    }
756
757    #[test]
758    fn test_update_with_no_last_finalized_block() {
759        let mut tracker = FinalizedBlockTracker::new(None);
760        assert_eq!(tracker.update(Some(10)), Some(10));
761        assert_eq!(tracker.last_finalized_block, Some(10));
762    }
763
764    #[test]
765    fn test_update_with_no_new_finalized_block() {
766        let mut tracker = FinalizedBlockTracker::new(Some(10));
767        assert_eq!(tracker.update(None), None);
768        assert_eq!(tracker.last_finalized_block, Some(10));
769    }
770
771    #[test]
772    fn test_update_with_no_finalized_blocks() {
773        let mut tracker = FinalizedBlockTracker::new(None);
774        assert_eq!(tracker.update(None), None);
775        assert_eq!(tracker.last_finalized_block, None);
776    }
777}