reth_engine_tree/
backup.rs

1//! reth's database backup functionality
2use alloy_eips::BlockNumHash;
3use reth_errors::ProviderError;
4use reth_node_core::dirs::{ChainPath, DataDirPath};
5use std::{
6    path::PathBuf,
7    sync::mpsc::{Receiver, Sender},
8    time::Instant,
9};
10use thiserror::Error;
11use tokio::sync::oneshot;
12use tracing::*;
13
14/// Configuration for the backup service
15#[derive(Debug, Clone)]
16pub struct BackupConfig {
17    /// Source directory to backup
18    pub source_dir: PathBuf,
19    /// Destination directory for backups
20    pub dest_dir: PathBuf,
21}
22
23/// Service that handles database backups based on block events
24#[derive(Debug)]
25pub struct BackupService {
26    /// Incoming backup requests
27    incoming: Receiver<BackupAction>,
28    /// The data directory for the engine tree.
29    data_dir: ChainPath<DataDirPath>,
30}
31/// A signal to the backup service that a backup should be performed.
32#[derive(Debug)]
33pub enum BackupAction {
34    /// Perform a backup at the given block number.
35    BackupAtBlock(BlockNumHash, oneshot::Sender<Option<BlockNumHash>>),
36}
37impl BackupService {
38    /// Create a new backup service
39    pub fn new(incoming: Receiver<BackupAction>, data_dir: ChainPath<DataDirPath>) -> Self {
40        Self { incoming, data_dir }
41    }
42
43    /// Main loop that processes backup actions
44    pub fn run(self) -> Result<(), ProviderError> {
45        debug!(target: "engine::backup", service=?self, "Backup service starting to run");
46        while let Ok(action) = self.incoming.recv() {
47            debug!(target: "engine::backup", action=?action, "Backup service received action");
48            match action {
49                BackupAction::BackupAtBlock(block_number, sender) => {
50                    let result = self.perform_backup(block_number);
51                    if let Err(e) = result {
52                        error!(target: "engine::backup", ?e, "Backup failed");
53                        let _ = sender.send(None);
54                    } else {
55                        let _ = sender.send(Some(block_number));
56                    }
57                }
58            }
59        }
60        Ok(())
61    }
62
63    /// Perform the actual backup operation
64    fn perform_backup(&self, block_number: BlockNumHash) -> Result<(), ProviderError> {
65        debug!(target: "engine::backup", ?block_number, "Starting backup");
66        let backup_path = PathBuf::from(format!("{}_backup", self.data_dir.data_dir().display(),));
67
68        // Perform the actual backup using the provider
69        BackupService::backup_dir(&PathBuf::from(self.data_dir.data_dir()), &backup_path)?;
70
71        info!(
72            target: "engine::backup",
73            ?block_number,
74            "Backup completed successfully"
75        );
76
77        Ok(())
78    }
79
80    /// Recursively copies the source directory to the destination directory.
81    ///
82    /// This function uses asynchronous file operations to perform the backup.
83    ///
84    /// # Arguments
85    ///
86    /// * `source` - The source directory to backup.
87    /// * `destination` - The destination directory where the backup will be stored.
88    ///
89    /// # Returns
90    ///
91    /// * `Ok(())` if the backup is successful.
92    /// * `Err(anyhow::Error)` if an error occurs during the backup.
93    pub fn backup_dir(source: &PathBuf, destination: &PathBuf) -> Result<(), ProviderError> {
94        debug!(target: "engine::backup", ?source, ?destination);
95
96        let source_path = source.as_path();
97        let destination_path = destination.as_path();
98
99        // Retrieve the metadata of the source path
100        let metadata = std::fs::metadata(source_path)
101            .expect(&format!("Failed to access source path: {} ", source_path.display(),));
102
103        // If the source is a directory, create the destination directory if it does not exist
104        if metadata.is_dir() {
105            if !destination_path.exists() {
106                std::fs::create_dir_all(destination_path)
107                    .expect(&format!("Failed to create destination directory"));
108            }
109
110            // Stack to manage recursive copying
111            let mut entries_stack =
112                vec![(source_path.to_path_buf(), destination_path.to_path_buf())];
113
114            while let Some((current_src, current_dst)) = entries_stack.pop() {
115                let mut entries = std::fs::read_dir(&current_src)
116                    .expect(&format!("Failed to read directory {}", current_src.display(),));
117
118                while let Some(entry) =
119                    entries.next().transpose().expect(&format!("Failed to get diredctory entry"))
120                {
121                    let entry_path = entry.path();
122                    let entry_name = entry.file_name();
123                    let dst_path = current_dst.join(&entry_name);
124                    let entry_metadata =
125                        entry.metadata().expect(&format!("Failed to get diredctory entry"));
126
127                    if entry_metadata.is_dir() {
128                        if !dst_path.exists() {
129                            std::fs::create_dir_all(&dst_path).expect(&format!(
130                                "Failed to create directory {}",
131                                dst_path.display(),
132                            ));
133                        }
134                        entries_stack.push((entry_path, dst_path));
135                    } else {
136                        std::fs::copy(&entry_path, &dst_path).expect(&format!(
137                            "Failed to copy file from {} to {}",
138                            entry_path.display(),
139                            dst_path.display(),
140                        ));
141                    }
142                }
143            }
144        } else {
145            // If the source is a file, copy it directly, creating parent directories if necessary
146            if let Some(parent) = destination_path.parent() {
147                if !parent.exists() {
148                    std::fs::create_dir_all(parent)
149                        .expect(
150                            &format!("Failed to create parent directory {}", parent.display(),),
151                        );
152                }
153            }
154            std::fs::copy(source_path, destination_path).expect(&format!(
155                "Failed to copy file from {} to {}",
156                source_path.display(),
157                destination_path.display(),
158            ));
159        }
160
161        Ok(())
162    }
163}
164
165/// Errors that can occur during backup operations
166#[derive(Debug, Error)]
167pub enum BackupError {
168    /// IO error
169    #[error(transparent)]
170    Io(#[from] std::io::Error),
171    /// Provider error
172    #[error(transparent)]
173    Provider(#[from] reth_provider::ProviderError),
174}
175
176/// Handle to interact with the backup service
177#[derive(Debug)]
178pub struct BackupHandle {
179    /// The sender for backup actions
180    pub sender: Sender<BackupAction>,
181    /// The receiver from backup service
182    pub rx: Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant)>,
183    /// The latest backup block number
184    pub latest_backup_block: BlockNumHash,
185}
186
187impl BackupHandle {
188    /// Create a new backup handle
189    pub fn new(sender: Sender<BackupAction>) -> Self {
190        Self { sender, rx: None, latest_backup_block: BlockNumHash::default() }
191    }
192
193    /// Spawn a new backup service
194    pub fn spawn_service(data_dir: ChainPath<DataDirPath>) -> BackupHandle {
195        let (tx, rx) = std::sync::mpsc::channel();
196        let handle = BackupHandle::new(tx);
197
198        let service = BackupService::new(rx, data_dir);
199        std::thread::Builder::new()
200            .name("Backup Service".to_string())
201            .spawn(move || {
202                if let Err(err) = service.run() {
203                    error!(target: "engine::backup", ?err, "Backup service failed");
204                }
205            })
206            .unwrap();
207
208        handle
209    }
210
211    /// Checks if a backup is currently in progress.
212    pub fn in_progress(&self) -> bool {
213        self.rx.is_some()
214    }
215
216    /// Sets state for a started backup task.
217    pub(crate) fn start(&mut self, rx: oneshot::Receiver<Option<BlockNumHash>>) {
218        self.rx = Some((rx, Instant::now()));
219    }
220
221    /// Sets state for a finished backup task.
222    pub fn finish(&mut self, block_number: BlockNumHash) {
223        self.latest_backup_block = block_number;
224        self.rx = None;
225    }
226}