1use crate::{
4 blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5 error::PoolError,
6 metrics::MaintainPoolMetrics,
7 traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8 BlockInfo, PoolTransaction, PoolUpdateKind,
9};
10use alloy_consensus::BlockHeader;
11use alloy_eips::BlockNumberOrTag;
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::Encodable;
14use futures_util::{
15 future::{BoxFuture, Fuse, FusedFuture},
16 FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives::{
23 transaction::SignedTransactionIntoRecoveredExt, SealedHeader, TransactionSigned,
24};
25use reth_primitives_traits::SignedTransaction;
26use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
27use reth_tasks::TaskSpawner;
28use std::{
29 borrow::Borrow,
30 collections::HashSet,
31 hash::{Hash, Hasher},
32 path::{Path, PathBuf},
33 sync::Arc,
34};
35use tokio::sync::oneshot;
36use tracing::{debug, error, info, trace, warn};
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct MaintainPoolConfig {
41 pub max_update_depth: u64,
46 pub max_reload_accounts: usize,
50}
51
52impl Default for MaintainPoolConfig {
53 fn default() -> Self {
54 Self { max_update_depth: 64, max_reload_accounts: 100 }
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct LocalTransactionBackupConfig {
61 pub transactions_path: Option<PathBuf>,
63}
64
65impl LocalTransactionBackupConfig {
66 pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
68 Self { transactions_path: Some(transactions_path) }
69 }
70}
71
72pub fn maintain_transaction_pool_future<Client, P, St, Tasks>(
74 client: Client,
75 pool: P,
76 events: St,
77 task_spawner: Tasks,
78 config: MaintainPoolConfig,
79) -> BoxFuture<'static, ()>
80where
81 Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + Send + 'static,
82 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = TransactionSigned>> + 'static,
83 St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
84 Tasks: TaskSpawner + 'static,
85{
86 async move {
87 maintain_transaction_pool(client, pool, events, task_spawner, config).await;
88 }
89 .boxed()
90}
91
92pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
96 client: Client,
97 pool: P,
98 mut events: St,
99 task_spawner: Tasks,
100 config: MaintainPoolConfig,
101) where
102 Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + Send + 'static,
103 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = TransactionSigned>> + 'static,
104 St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
105 Tasks: TaskSpawner + 'static,
106{
107 let metrics = MaintainPoolMetrics::default();
108 let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
109 if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
111 let latest = SealedHeader::seal(latest);
112 let chain_spec = client.chain_spec();
113 let info = BlockInfo {
114 block_gas_limit: latest.gas_limit(),
115 last_seen_block_hash: latest.hash(),
116 last_seen_block_number: latest.number(),
117 pending_basefee: latest
118 .next_block_base_fee(
119 chain_spec.base_fee_params_at_timestamp(latest.timestamp() + 12),
120 )
121 .unwrap_or_default(),
122 pending_blob_fee: latest.next_block_blob_fee(),
123 };
124 pool.set_block_info(info);
125 }
126
127 let mut blob_store_tracker = BlobStoreCanonTracker::default();
129
130 let mut last_finalized_block =
132 FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
133
134 let mut dirty_addresses = HashSet::default();
136
137 let mut maintained_state = MaintainedPoolState::InSync;
139
140 let mut reload_accounts_fut = Fuse::terminated();
142
143 loop {
146 trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
147
148 metrics.set_dirty_accounts_len(dirty_addresses.len());
149 let pool_info = pool.block_info();
150
151 if maintained_state.is_drifted() {
155 metrics.inc_drift();
156 dirty_addresses = pool.unique_senders();
158 maintained_state = MaintainedPoolState::InSync;
160 }
161
162 if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
164 let (tx, rx) = oneshot::channel();
165 let c = client.clone();
166 let at = pool_info.last_seen_block_hash;
167 let fut = if dirty_addresses.len() > max_reload_accounts {
168 let accs_to_reload =
170 dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
171 for acc in &accs_to_reload {
172 dirty_addresses.remove(acc);
174 }
175 async move {
176 let res = load_accounts(c, at, accs_to_reload.into_iter());
177 let _ = tx.send(res);
178 }
179 .boxed()
180 } else {
181 let accs_to_reload = std::mem::take(&mut dirty_addresses);
183 async move {
184 let res = load_accounts(c, at, accs_to_reload.into_iter());
185 let _ = tx.send(res);
186 }
187 .boxed()
188 };
189 reload_accounts_fut = rx.fuse();
190 task_spawner.spawn_blocking(fut);
191 }
192
193 if let Some(finalized) =
195 last_finalized_block.update(client.finalized_block_number().ok().flatten())
196 {
197 if let BlobStoreUpdates::Finalized(blobs) =
198 blob_store_tracker.on_finalized_block(finalized)
199 {
200 metrics.inc_deleted_tracked_blobs(blobs.len());
201 pool.delete_blobs(blobs);
203 let pool = pool.clone();
205 task_spawner.spawn_blocking(Box::pin(async move {
206 debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
207 pool.cleanup_blobs();
208 }));
209 }
210 }
211
212 let mut event = None;
214 let mut reloaded = None;
215
216 tokio::select! {
219 res = &mut reload_accounts_fut => {
220 reloaded = Some(res);
221 }
222 ev = events.next() => {
223 if ev.is_none() {
224 break;
226 }
227 event = ev;
228 }
229 }
230
231 match reloaded {
233 Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
234 dirty_addresses.extend(failed_to_load);
237 pool.update_accounts(accounts);
239 }
240 Some(Ok(Err(res))) => {
241 let (accs, err) = *res;
243 debug!(target: "txpool", %err, "failed to load accounts");
244 dirty_addresses.extend(accs);
245 }
246 Some(Err(_)) => {
247 maintained_state = MaintainedPoolState::Drifted;
249 }
250 None => {}
251 }
252
253 let Some(event) = event else { continue };
255 match event {
256 CanonStateNotification::Reorg { old, new } => {
257 let (old_blocks, old_state) = old.inner();
258 let (new_blocks, new_state) = new.inner();
259 let new_tip = new_blocks.tip();
260 let new_first = new_blocks.first();
261 let old_first = old_blocks.first();
262
263 if !(old_first.parent_hash == pool_info.last_seen_block_hash ||
265 new_first.parent_hash == pool_info.last_seen_block_hash)
266 {
267 maintained_state = MaintainedPoolState::Drifted;
269 }
270
271 let chain_spec = client.chain_spec();
272
273 let pending_block_base_fee = new_tip
275 .next_block_base_fee(
276 chain_spec.base_fee_params_at_timestamp(new_tip.timestamp + 12),
277 )
278 .unwrap_or_default();
279 let pending_block_blob_fee = new_tip.next_block_blob_fee();
280
281 let new_changed_accounts: HashSet<_> =
283 new_state.changed_accounts().map(ChangedAccountEntry).collect();
284
285 let missing_changed_acc = old_state
287 .accounts_iter()
288 .map(|(a, _)| a)
289 .filter(|addr| !new_changed_accounts.contains(addr));
290
291 let mut changed_accounts =
293 match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
294 Ok(LoadedAccounts { accounts, failed_to_load }) => {
295 dirty_addresses.extend(failed_to_load);
297
298 accounts
299 }
300 Err(err) => {
301 let (addresses, err) = *err;
302 debug!(
303 target: "txpool",
304 %err,
305 "failed to load missing changed accounts at new tip: {:?}",
306 new_tip.hash()
307 );
308 dirty_addresses.extend(addresses);
309 vec![]
310 }
311 };
312
313 changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
316
317 let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
319
320 let pruned_old_transactions = old_blocks
323 .transactions_ecrecovered()
324 .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
325 .filter_map(|tx| {
326 if tx.is_eip4844() {
327 pool.get_blob(TransactionSigned::hash(&tx))
333 .ok()
334 .flatten()
335 .map(Arc::unwrap_or_clone)
336 .and_then(|sidecar| {
337 <P as TransactionPool>::Transaction::try_from_eip4844(
338 tx, sidecar,
339 )
340 })
341 } else {
342 <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
343 }
344 })
345 .collect::<Vec<_>>();
346
347 let update = CanonicalStateUpdate {
349 new_tip: &new_tip.block,
350 pending_block_base_fee,
351 pending_block_blob_fee,
352 changed_accounts,
353 mined_transactions: new_blocks.transaction_hashes().collect(),
355 update_kind: PoolUpdateKind::Reorg,
356 };
357 pool.on_canonical_state_change(update);
358
359 metrics.inc_reinserted_transactions(pruned_old_transactions.len());
366 let _ = pool.add_external_transactions(pruned_old_transactions).await;
367
368 blob_store_tracker.add_new_chain_blocks(&new_blocks);
370 }
371 CanonStateNotification::Commit { new } => {
372 let (blocks, state) = new.inner();
373 let tip = blocks.tip();
374 let chain_spec = client.chain_spec();
375
376 let pending_block_base_fee = tip
378 .next_block_base_fee(
379 chain_spec.base_fee_params_at_timestamp(tip.timestamp + 12),
380 )
381 .unwrap_or_default();
382 let pending_block_blob_fee = tip.next_block_blob_fee();
383
384 let first_block = blocks.first();
385 trace!(
386 target: "txpool",
387 first = first_block.number,
388 tip = tip.number,
389 pool_block = pool_info.last_seen_block_number,
390 "update pool on new commit"
391 );
392
393 let depth = tip.number.abs_diff(pool_info.last_seen_block_number);
396 if depth > max_update_depth {
397 maintained_state = MaintainedPoolState::Drifted;
398 debug!(target: "txpool", ?depth, "skipping deep canonical update");
399 let info = BlockInfo {
400 block_gas_limit: tip.gas_limit,
401 last_seen_block_hash: tip.hash(),
402 last_seen_block_number: tip.number,
403 pending_basefee: pending_block_base_fee,
404 pending_blob_fee: pending_block_blob_fee,
405 };
406 pool.set_block_info(info);
407
408 blob_store_tracker.add_new_chain_blocks(&blocks);
410
411 continue
412 }
413
414 let mut changed_accounts = Vec::with_capacity(state.state().len());
415 for acc in state.changed_accounts() {
416 dirty_addresses.remove(&acc.address);
418 changed_accounts.push(acc);
419 }
420
421 let mined_transactions = blocks.transaction_hashes().collect();
422
423 if first_block.parent_hash != pool_info.last_seen_block_hash {
425 maintained_state = MaintainedPoolState::Drifted;
429 }
430
431 let update = CanonicalStateUpdate {
433 new_tip: &tip.block,
434 pending_block_base_fee,
435 pending_block_blob_fee,
436 changed_accounts,
437 mined_transactions,
438 update_kind: PoolUpdateKind::Commit,
439 };
440 pool.on_canonical_state_change(update);
441
442 blob_store_tracker.add_new_chain_blocks(&blocks);
444 }
445 }
446 }
447}
448
449struct FinalizedBlockTracker {
450 last_finalized_block: Option<BlockNumber>,
451}
452
453impl FinalizedBlockTracker {
454 const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
455 Self { last_finalized_block }
456 }
457
458 fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
460 let finalized = finalized_block?;
461 self.last_finalized_block
462 .replace(finalized)
463 .is_none_or(|last| last < finalized)
464 .then_some(finalized)
465 }
466}
467
468#[derive(Debug, PartialEq, Eq)]
471enum MaintainedPoolState {
472 InSync,
474 Drifted,
476}
477
478impl MaintainedPoolState {
479 #[inline]
481 const fn is_drifted(&self) -> bool {
482 matches!(self, Self::Drifted)
483 }
484}
485
486#[derive(Eq)]
488struct ChangedAccountEntry(ChangedAccount);
489
490impl PartialEq for ChangedAccountEntry {
491 fn eq(&self, other: &Self) -> bool {
492 self.0.address == other.0.address
493 }
494}
495
496impl Hash for ChangedAccountEntry {
497 fn hash<H: Hasher>(&self, state: &mut H) {
498 self.0.address.hash(state);
499 }
500}
501
502impl Borrow<Address> for ChangedAccountEntry {
503 fn borrow(&self) -> &Address {
504 &self.0.address
505 }
506}
507
508#[derive(Default)]
509struct LoadedAccounts {
510 accounts: Vec<ChangedAccount>,
512 failed_to_load: Vec<Address>,
514}
515
516fn load_accounts<Client, I>(
522 client: Client,
523 at: BlockHash,
524 addresses: I,
525) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
526where
527 I: IntoIterator<Item = Address>,
528
529 Client: StateProviderFactory,
530{
531 let addresses = addresses.into_iter();
532 let mut res = LoadedAccounts::default();
533 let state = match client.history_by_block_hash(at) {
534 Ok(state) => state,
535 Err(err) => return Err(Box::new((addresses.collect(), err))),
536 };
537 for addr in addresses {
538 if let Ok(maybe_acc) = state.basic_account(addr) {
539 let acc = maybe_acc
540 .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
541 .unwrap_or_else(|| ChangedAccount::empty(addr));
542 res.accounts.push(acc)
543 } else {
544 res.failed_to_load.push(addr);
546 }
547 }
548 Ok(res)
549}
550
551async fn load_and_reinsert_transactions<P>(
555 pool: P,
556 file_path: &Path,
557) -> Result<(), TransactionsBackupError>
558where
559 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
560{
561 if !file_path.exists() {
562 return Ok(())
563 }
564
565 debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
566 let data = reth_fs_util::read(file_path)?;
567
568 if data.is_empty() {
569 return Ok(())
570 }
571
572 let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
573 alloy_rlp::Decodable::decode(&mut data.as_slice())?;
574
575 let pool_transactions = txs_signed
576 .into_iter()
577 .filter_map(|tx| tx.try_ecrecovered())
578 .filter_map(|tx| {
579 <P::Transaction as PoolTransaction>::try_from_consensus(tx).ok()
581 })
582 .collect();
583
584 let outcome = pool.add_transactions(crate::TransactionOrigin::Local, pool_transactions).await;
585
586 info!(target: "txpool", txs_file =?file_path, num_txs=%outcome.len(), "Successfully reinserted local transactions from file");
587 reth_fs_util::remove_file(file_path)?;
588 Ok(())
589}
590
591fn save_local_txs_backup<P>(pool: P, file_path: &Path)
592where
593 P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
594{
595 let local_transactions = pool.get_local_transactions();
596 if local_transactions.is_empty() {
597 trace!(target: "txpool", "no local transactions to save");
598 return
599 }
600
601 let local_transactions = local_transactions
602 .into_iter()
603 .map(|tx| tx.transaction.clone_into_consensus().into_signed())
604 .collect::<Vec<_>>();
605
606 let num_txs = local_transactions.len();
607 let mut buf = Vec::new();
608 alloy_rlp::encode_list(&local_transactions, &mut buf);
609 info!(target: "txpool", txs_file =?file_path, num_txs=%num_txs, "Saving current local transactions");
610 let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
611
612 match parent_dir.map(|_| reth_fs_util::write(file_path, buf)) {
613 Ok(_) => {
614 info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
615 }
616 Err(err) => {
617 warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
618 }
619 }
620}
621
622#[derive(thiserror::Error, Debug)]
624pub enum TransactionsBackupError {
625 #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
627 Decode(#[from] alloy_rlp::Error),
628 #[error("failed to apply transactions backup. Encountered file error: {0}")]
630 FsPath(#[from] FsPathError),
631 #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
633 Pool(#[from] PoolError),
634}
635
636pub async fn backup_local_transactions_task<P>(
639 shutdown: reth_tasks::shutdown::GracefulShutdown,
640 pool: P,
641 config: LocalTransactionBackupConfig,
642) where
643 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
644{
645 let Some(transactions_path) = config.transactions_path else {
646 return
648 };
649
650 if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
651 error!(target: "txpool", "{}", err)
652 }
653
654 let graceful_guard = shutdown.await;
655
656 save_local_txs_backup(pool, &transactions_path);
658
659 drop(graceful_guard)
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665 use crate::{
666 blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
667 CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
668 };
669 use alloy_eips::eip2718::Decodable2718;
670 use alloy_primitives::{hex, U256};
671 use reth_chainspec::MAINNET;
672 use reth_fs_util as fs;
673 use reth_primitives::PooledTransactionsElement;
674 use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
675 use reth_tasks::TaskManager;
676
677 #[test]
678 fn changed_acc_entry() {
679 let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
680 let mut copy = changed_acc.0;
681 copy.nonce = 10;
682 assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
683 }
684
685 const EXTENSION: &str = "rlp";
686 const FILENAME: &str = "test_transactions_backup";
687
688 #[tokio::test(flavor = "multi_thread")]
689 async fn test_save_local_txs_backup() {
690 let temp_dir = tempfile::tempdir().unwrap();
691 let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
692 let tx_bytes = hex!("02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507");
693 let tx = PooledTransactionsElement::decode_2718(&mut &tx_bytes[..]).unwrap();
694 let provider = MockEthProvider::default();
695 let transaction: EthPooledTransaction = tx.try_into_ecrecovered().unwrap().into();
696 let tx_to_cmp = transaction.clone();
697 let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
698 provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
699 let blob_store = InMemoryBlobStore::default();
700 let validator = EthTransactionValidatorBuilder::new(MAINNET.clone())
701 .build(provider, blob_store.clone());
702
703 let txpool = Pool::new(
704 validator.clone(),
705 CoinbaseTipOrdering::default(),
706 blob_store.clone(),
707 Default::default(),
708 );
709
710 txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
711
712 let handle = tokio::runtime::Handle::current();
713 let manager = TaskManager::new(handle);
714 let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
715 manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
716 backup_local_transactions_task(shutdown, txpool.clone(), config)
717 });
718
719 let mut txns = txpool.get_local_transactions();
720 let tx_on_finish = txns.pop().expect("there should be 1 transaction");
721
722 assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
723
724 manager.graceful_shutdown();
726
727 let data = fs::read(transactions_path).unwrap();
728
729 let txs: Vec<TransactionSigned> =
730 alloy_rlp::Decodable::decode(&mut data.as_slice()).unwrap();
731 assert_eq!(txs.len(), 1);
732
733 temp_dir.close().unwrap();
734 }
735
736 #[test]
737 fn test_update_with_higher_finalized_block() {
738 let mut tracker = FinalizedBlockTracker::new(Some(10));
739 assert_eq!(tracker.update(Some(15)), Some(15));
740 assert_eq!(tracker.last_finalized_block, Some(15));
741 }
742
743 #[test]
744 fn test_update_with_lower_finalized_block() {
745 let mut tracker = FinalizedBlockTracker::new(Some(20));
746 assert_eq!(tracker.update(Some(15)), None);
747 assert_eq!(tracker.last_finalized_block, Some(15));
748 }
749
750 #[test]
751 fn test_update_with_equal_finalized_block() {
752 let mut tracker = FinalizedBlockTracker::new(Some(20));
753 assert_eq!(tracker.update(Some(20)), None);
754 assert_eq!(tracker.last_finalized_block, Some(20));
755 }
756
757 #[test]
758 fn test_update_with_no_last_finalized_block() {
759 let mut tracker = FinalizedBlockTracker::new(None);
760 assert_eq!(tracker.update(Some(10)), Some(10));
761 assert_eq!(tracker.last_finalized_block, Some(10));
762 }
763
764 #[test]
765 fn test_update_with_no_new_finalized_block() {
766 let mut tracker = FinalizedBlockTracker::new(Some(10));
767 assert_eq!(tracker.update(None), None);
768 assert_eq!(tracker.last_finalized_block, Some(10));
769 }
770
771 #[test]
772 fn test_update_with_no_finalized_blocks() {
773 let mut tracker = FinalizedBlockTracker::new(None);
774 assert_eq!(tracker.update(None), None);
775 assert_eq!(tracker.last_finalized_block, None);
776 }
777}