reth_engine_tree/
backup.rs

1//! reth's database backup functionality
2use alloy_eips::BlockNumHash;
3use reth_errors::ProviderError;
4use reth_node_core::dirs::{ChainPath, DataDirPath};
5use reth_tracing::tracing::*;
6use std::{
7    path::PathBuf,
8    sync::mpsc::{Receiver, Sender},
9    time::Instant,
10};
11use thiserror::Error;
12use tokio::sync::oneshot;
13
14/// Configuration for the backup service
15#[derive(Debug, Clone)]
16pub struct BackupConfig {
17    /// Source directory to backup
18    pub source_dir: PathBuf,
19    /// Destination directory for backups
20    pub dest_dir: PathBuf,
21}
22
23/// Service that handles database backups based on block events
24#[derive(Debug)]
25pub struct BackupService {
26    /// Incoming backup requests
27    incoming: Receiver<BackupAction>,
28    /// The data directory for the engine tree.
29    data_dir: ChainPath<DataDirPath>,
30}
31/// A signal to the backup service that a backup should be performed.
32#[derive(Debug)]
33pub enum BackupAction {
34    /// Perform a backup at the given block number.
35    BackupAtBlock(BlockNumHash, oneshot::Sender<Option<BlockNumHash>>),
36}
37impl BackupService {
38    /// Create a new backup service
39    pub fn new(incoming: Receiver<BackupAction>, data_dir: ChainPath<DataDirPath>) -> Self {
40        Self { incoming, data_dir }
41    }
42
43    /// Main loop that processes backup actions
44    pub fn run(self) -> Result<(), ProviderError> {
45        debug!(target: "engine::backup", service=?self, "Backup service starting to run");
46        while let Ok(action) = self.incoming.recv() {
47            debug!(target: "engine::backup", action=?action, "Backup service received action");
48            match action {
49                BackupAction::BackupAtBlock(block_number, sender) => {
50                    let result = self.perform_backup(block_number);
51                    if let Err(e) = result {
52                        error!(target: "engine::backup", ?e, "Backup failed");
53                        let _ = sender.send(None);
54                    } else {
55                        let _ = sender.send(Some(block_number));
56                    }
57                }
58            }
59        }
60        Ok(())
61    }
62
63    /// Perform the actual backup operation
64    fn perform_backup(&self, block_number: BlockNumHash) -> Result<(), ProviderError> {
65        debug!(target: "engine::backup", ?block_number, "Starting backup");
66        let backup_path = PathBuf::from(format!("{}_backup", self.data_dir.data_dir().display(),));
67
68        // Perform the actual backup using the provider
69        BackupService::backup_dir(&PathBuf::from(self.data_dir.data_dir()), &backup_path)?;
70
71        info!(
72            target: "engine::backup",
73            ?block_number,
74            "Backup completed successfully"
75        );
76
77        Ok(())
78    }
79
80    /// Recursively copies the source directory to the destination directory.
81    ///
82    /// This function uses asynchronous file operations to perform the backup.
83    ///
84    /// # Arguments
85    ///
86    /// * `source` - The source directory to backup.
87    /// * `destination` - The destination directory where the backup will be stored.
88    ///
89    /// # Returns
90    ///
91    /// * `Ok(())` if the backup is successful.
92    /// * `Err(anyhow::Error)` if an error occurs during the backup.
93    pub fn backup_dir(source: &PathBuf, destination: &PathBuf) -> Result<(), ProviderError> {
94        debug!(target: "engine::backup", ?source, ?destination);
95
96        let source_path = source.as_path();
97        let destination_path = destination.as_path();
98
99        // Retrieve the metadata of the source path
100        let metadata = std::fs::metadata(source_path).map_err(|e| {
101            ProviderError::FsPathError(format!(
102                "Failed to access source path: {} : {}",
103                source_path.display(),
104                e,
105            ))
106        })?;
107
108        // If the source is a directory, create the destination directory if it does not exist
109        if metadata.is_dir() {
110            if !destination_path.exists() {
111                std::fs::create_dir_all(destination_path).map_err(|e| {
112                    ProviderError::FsPathError(format!(
113                        "Failed to create destination directory: {}",
114                        e
115                    ))
116                })?;
117            }
118
119            // Stack to manage recursive copying
120            let mut entries_stack =
121                vec![(source_path.to_path_buf(), destination_path.to_path_buf())];
122
123            while let Some((current_src, current_dst)) = entries_stack.pop() {
124                let mut entries = std::fs::read_dir(&current_src).map_err(|e| {
125                    ProviderError::FsPathError(format!(
126                        "Failed to read directory {}: {}",
127                        current_src.display(),
128                        e
129                    ))
130                })?;
131
132                while let Some(entry) = entries.next().transpose().map_err(|e| {
133                    ProviderError::FsPathError(format!("Failed to get diredctory entry: {}", e))
134                })? {
135                    let entry_path = entry.path();
136                    let entry_name = entry.file_name();
137                    let dst_path = current_dst.join(&entry_name);
138                    let entry_metadata = entry.metadata().map_err(|e| {
139                        ProviderError::FsPathError(format!("Failed to get diredctory entry: {}", e))
140                    })?;
141
142                    if entry_metadata.is_dir() {
143                        if !dst_path.exists() {
144                            std::fs::create_dir_all(&dst_path).map_err(|e| {
145                                ProviderError::FsPathError(format!(
146                                    "Failed to create directory {}: {}",
147                                    dst_path.display(),
148                                    e
149                                ))
150                            })?;
151                        }
152                        entries_stack.push((entry_path, dst_path));
153                    } else {
154                        std::fs::copy(&entry_path, &dst_path).map_err(|e| {
155                            ProviderError::FsPathError(format!(
156                                "Failed to copy file from {} to {}: {}",
157                                entry_path.display(),
158                                dst_path.display(),
159                                e
160                            ))
161                        })?;
162                    }
163                }
164            }
165        } else {
166            // If the source is a file, copy it directly, creating parent directories if necessary
167            if let Some(parent) = destination_path.parent() {
168                if !parent.exists() {
169                    std::fs::create_dir_all(parent).map_err(|e| {
170                        ProviderError::FsPathError(format!(
171                            "Failed to create parent directory {}: {}",
172                            parent.display(),
173                            e
174                        ))
175                    })?;
176                }
177            }
178            std::fs::copy(source_path, destination_path).map_err(|e| {
179                ProviderError::FsPathError(format!(
180                    "Failed to copy file from {} to {}: {}",
181                    source_path.display(),
182                    destination_path.display(),
183                    e
184                ))
185            })?;
186        }
187
188        Ok(())
189    }
190}
191
192/// Errors that can occur during backup operations
193#[derive(Debug, Error)]
194pub enum BackupError {
195    /// IO error
196    #[error(transparent)]
197    Io(#[from] std::io::Error),
198    /// Provider error
199    #[error(transparent)]
200    Provider(#[from] reth_provider::ProviderError),
201}
202
203/// Handle to interact with the backup service
204#[derive(Debug)]
205pub struct BackupHandle {
206    /// The sender for backup actions
207    pub sender: Sender<BackupAction>,
208    /// The receiver from backup service
209    pub rx: Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant)>,
210    /// The latest backup block number
211    pub latest_backup_block: BlockNumHash,
212}
213
214impl BackupHandle {
215    /// Create a new backup handle
216    pub fn new(sender: Sender<BackupAction>) -> Self {
217        Self { sender, rx: None, latest_backup_block: BlockNumHash::default() }
218    }
219
220    /// Spawn a new backup service
221    pub fn spawn_service(data_dir: ChainPath<DataDirPath>) -> BackupHandle {
222        let (tx, rx) = std::sync::mpsc::channel();
223        let handle = BackupHandle::new(tx);
224
225        let service = BackupService::new(rx, data_dir);
226        std::thread::Builder::new()
227            .name("Backup Service".to_string())
228            .spawn(move || {
229                if let Err(err) = service.run() {
230                    error!(target: "engine::backup", ?err, "Backup service failed");
231                }
232            })
233            .unwrap();
234
235        handle
236    }
237
238    /// Checks if a backup is currently in progress.
239    pub fn in_progress(&self) -> bool {
240        self.rx.is_some()
241    }
242
243    /// Sets state for a started backup task.
244    pub(crate) fn start(&mut self, rx: oneshot::Receiver<Option<BlockNumHash>>) {
245        self.rx = Some((rx, Instant::now()));
246    }
247
248    /// Sets state for a finished backup task.
249    pub fn finish(&mut self, block_number: BlockNumHash) {
250        self.latest_backup_block = block_number;
251        self.rx = None;
252    }
253}