reth_engine_tree/
backup.rs1use alloy_eips::BlockNumHash;
3use reth_errors::ProviderError;
4use reth_node_core::dirs::{ChainPath, DataDirPath};
5use reth_tracing::tracing::*;
6use std::{
7 path::PathBuf,
8 sync::mpsc::{Receiver, Sender},
9 time::Instant,
10};
11use thiserror::Error;
12use tokio::sync::oneshot;
13
14#[derive(Debug, Clone)]
16pub struct BackupConfig {
17 pub source_dir: PathBuf,
19 pub dest_dir: PathBuf,
21}
22
23#[derive(Debug)]
25pub struct BackupService {
26 incoming: Receiver<BackupAction>,
28 data_dir: ChainPath<DataDirPath>,
30}
31#[derive(Debug)]
33pub enum BackupAction {
34 BackupAtBlock(BlockNumHash, oneshot::Sender<Option<BlockNumHash>>),
36}
37impl BackupService {
38 pub fn new(incoming: Receiver<BackupAction>, data_dir: ChainPath<DataDirPath>) -> Self {
40 Self { incoming, data_dir }
41 }
42
43 pub fn run(self) -> Result<(), ProviderError> {
45 debug!(target: "engine::backup", service=?self, "Backup service starting to run");
46 while let Ok(action) = self.incoming.recv() {
47 debug!(target: "engine::backup", action=?action, "Backup service received action");
48 match action {
49 BackupAction::BackupAtBlock(block_number, sender) => {
50 let result = self.perform_backup(block_number);
51 if let Err(e) = result {
52 error!(target: "engine::backup", ?e, "Backup failed");
53 let _ = sender.send(None);
54 } else {
55 let _ = sender.send(Some(block_number));
56 }
57 }
58 }
59 }
60 Ok(())
61 }
62
63 fn perform_backup(&self, block_number: BlockNumHash) -> Result<(), ProviderError> {
65 debug!(target: "engine::backup", ?block_number, "Starting backup");
66 let backup_path = PathBuf::from(format!("{}_backup", self.data_dir.data_dir().display(),));
67
68 BackupService::backup_dir(&PathBuf::from(self.data_dir.data_dir()), &backup_path)?;
70
71 info!(
72 target: "engine::backup",
73 ?block_number,
74 "Backup completed successfully"
75 );
76
77 Ok(())
78 }
79
80 pub fn backup_dir(source: &PathBuf, destination: &PathBuf) -> Result<(), ProviderError> {
94 debug!(target: "engine::backup", ?source, ?destination);
95
96 let source_path = source.as_path();
97 let destination_path = destination.as_path();
98
99 let metadata = std::fs::metadata(source_path).map_err(|e| {
101 ProviderError::FsPathError(format!(
102 "Failed to access source path: {} : {}",
103 source_path.display(),
104 e,
105 ))
106 })?;
107
108 if metadata.is_dir() {
110 if !destination_path.exists() {
111 std::fs::create_dir_all(destination_path).map_err(|e| {
112 ProviderError::FsPathError(format!(
113 "Failed to create destination directory: {}",
114 e
115 ))
116 })?;
117 }
118
119 let mut entries_stack =
121 vec![(source_path.to_path_buf(), destination_path.to_path_buf())];
122
123 while let Some((current_src, current_dst)) = entries_stack.pop() {
124 let mut entries = std::fs::read_dir(¤t_src).map_err(|e| {
125 ProviderError::FsPathError(format!(
126 "Failed to read directory {}: {}",
127 current_src.display(),
128 e
129 ))
130 })?;
131
132 while let Some(entry) = entries.next().transpose().map_err(|e| {
133 ProviderError::FsPathError(format!("Failed to get diredctory entry: {}", e))
134 })? {
135 let entry_path = entry.path();
136 let entry_name = entry.file_name();
137 let dst_path = current_dst.join(&entry_name);
138 let entry_metadata = entry.metadata().map_err(|e| {
139 ProviderError::FsPathError(format!("Failed to get diredctory entry: {}", e))
140 })?;
141
142 if entry_metadata.is_dir() {
143 if !dst_path.exists() {
144 std::fs::create_dir_all(&dst_path).map_err(|e| {
145 ProviderError::FsPathError(format!(
146 "Failed to create directory {}: {}",
147 dst_path.display(),
148 e
149 ))
150 })?;
151 }
152 entries_stack.push((entry_path, dst_path));
153 } else {
154 std::fs::copy(&entry_path, &dst_path).map_err(|e| {
155 ProviderError::FsPathError(format!(
156 "Failed to copy file from {} to {}: {}",
157 entry_path.display(),
158 dst_path.display(),
159 e
160 ))
161 })?;
162 }
163 }
164 }
165 } else {
166 if let Some(parent) = destination_path.parent() {
168 if !parent.exists() {
169 std::fs::create_dir_all(parent).map_err(|e| {
170 ProviderError::FsPathError(format!(
171 "Failed to create parent directory {}: {}",
172 parent.display(),
173 e
174 ))
175 })?;
176 }
177 }
178 std::fs::copy(source_path, destination_path).map_err(|e| {
179 ProviderError::FsPathError(format!(
180 "Failed to copy file from {} to {}: {}",
181 source_path.display(),
182 destination_path.display(),
183 e
184 ))
185 })?;
186 }
187
188 Ok(())
189 }
190}
191
192#[derive(Debug, Error)]
194pub enum BackupError {
195 #[error(transparent)]
197 Io(#[from] std::io::Error),
198 #[error(transparent)]
200 Provider(#[from] reth_provider::ProviderError),
201}
202
203#[derive(Debug)]
205pub struct BackupHandle {
206 pub sender: Sender<BackupAction>,
208 pub rx: Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant)>,
210 pub latest_backup_block: BlockNumHash,
212}
213
214impl BackupHandle {
215 pub fn new(sender: Sender<BackupAction>) -> Self {
217 Self { sender, rx: None, latest_backup_block: BlockNumHash::default() }
218 }
219
220 pub fn spawn_service(data_dir: ChainPath<DataDirPath>) -> BackupHandle {
222 let (tx, rx) = std::sync::mpsc::channel();
223 let handle = BackupHandle::new(tx);
224
225 let service = BackupService::new(rx, data_dir);
226 std::thread::Builder::new()
227 .name("Backup Service".to_string())
228 .spawn(move || {
229 if let Err(err) = service.run() {
230 error!(target: "engine::backup", ?err, "Backup service failed");
231 }
232 })
233 .unwrap();
234
235 handle
236 }
237
238 pub fn in_progress(&self) -> bool {
240 self.rx.is_some()
241 }
242
243 pub(crate) fn start(&mut self, rx: oneshot::Receiver<Option<BlockNumHash>>) {
245 self.rx = Some((rx, Instant::now()));
246 }
247
248 pub fn finish(&mut self, block_number: BlockNumHash) {
250 self.latest_backup_block = block_number;
251 self.rx = None;
252 }
253}