reth_engine_tree/
backup.rs1use alloy_eips::BlockNumHash;
3use reth_errors::ProviderError;
4use reth_node_core::dirs::{ChainPath, DataDirPath};
5use std::{
6 path::PathBuf,
7 sync::mpsc::{Receiver, Sender},
8 time::Instant,
9};
10use thiserror::Error;
11use tokio::sync::oneshot;
12use tracing::*;
13
14#[derive(Debug, Clone)]
16pub struct BackupConfig {
17 pub source_dir: PathBuf,
19 pub dest_dir: PathBuf,
21}
22
23#[derive(Debug)]
25pub struct BackupService {
26 incoming: Receiver<BackupAction>,
28 data_dir: ChainPath<DataDirPath>,
30}
31#[derive(Debug)]
33pub enum BackupAction {
34 BackupAtBlock(BlockNumHash, oneshot::Sender<Option<BlockNumHash>>),
36}
37impl BackupService {
38 pub fn new(incoming: Receiver<BackupAction>, data_dir: ChainPath<DataDirPath>) -> Self {
40 Self { incoming, data_dir }
41 }
42
43 pub fn run(self) -> Result<(), ProviderError> {
45 debug!(target: "engine::backup", service=?self, "Backup service starting to run");
46 while let Ok(action) = self.incoming.recv() {
47 debug!(target: "engine::backup", action=?action, "Backup service received action");
48 match action {
49 BackupAction::BackupAtBlock(block_number, sender) => {
50 let result = self.perform_backup(block_number);
51 if let Err(e) = result {
52 error!(target: "engine::backup", ?e, "Backup failed");
53 let _ = sender.send(None);
54 } else {
55 let _ = sender.send(Some(block_number));
56 }
57 }
58 }
59 }
60 Ok(())
61 }
62
63 fn perform_backup(&self, block_number: BlockNumHash) -> Result<(), ProviderError> {
65 debug!(target: "engine::backup", ?block_number, "Starting backup");
66 let backup_path = PathBuf::from(format!("{}_backup", self.data_dir.data_dir().display(),));
67
68 BackupService::backup_dir(&PathBuf::from(self.data_dir.data_dir()), &backup_path)?;
70
71 info!(
72 target: "engine::backup",
73 ?block_number,
74 "Backup completed successfully"
75 );
76
77 Ok(())
78 }
79
80 pub fn backup_dir(source: &PathBuf, destination: &PathBuf) -> Result<(), ProviderError> {
94 debug!(target: "engine::backup", ?source, ?destination);
95
96 let source_path = source.as_path();
97 let destination_path = destination.as_path();
98
99 let metadata = std::fs::metadata(source_path)
101 .expect(&format!("Failed to access source path: {} ", source_path.display(),));
102
103 if metadata.is_dir() {
105 if !destination_path.exists() {
106 std::fs::create_dir_all(destination_path)
107 .expect(&format!("Failed to create destination directory"));
108 }
109
110 let mut entries_stack =
112 vec![(source_path.to_path_buf(), destination_path.to_path_buf())];
113
114 while let Some((current_src, current_dst)) = entries_stack.pop() {
115 let mut entries = std::fs::read_dir(¤t_src)
116 .expect(&format!("Failed to read directory {}", current_src.display(),));
117
118 while let Some(entry) =
119 entries.next().transpose().expect(&format!("Failed to get diredctory entry"))
120 {
121 let entry_path = entry.path();
122 let entry_name = entry.file_name();
123 let dst_path = current_dst.join(&entry_name);
124 let entry_metadata =
125 entry.metadata().expect(&format!("Failed to get diredctory entry"));
126
127 if entry_metadata.is_dir() {
128 if !dst_path.exists() {
129 std::fs::create_dir_all(&dst_path).expect(&format!(
130 "Failed to create directory {}",
131 dst_path.display(),
132 ));
133 }
134 entries_stack.push((entry_path, dst_path));
135 } else {
136 std::fs::copy(&entry_path, &dst_path).expect(&format!(
137 "Failed to copy file from {} to {}",
138 entry_path.display(),
139 dst_path.display(),
140 ));
141 }
142 }
143 }
144 } else {
145 if let Some(parent) = destination_path.parent() {
147 if !parent.exists() {
148 std::fs::create_dir_all(parent)
149 .expect(
150 &format!("Failed to create parent directory {}", parent.display(),),
151 );
152 }
153 }
154 std::fs::copy(source_path, destination_path).expect(&format!(
155 "Failed to copy file from {} to {}",
156 source_path.display(),
157 destination_path.display(),
158 ));
159 }
160
161 Ok(())
162 }
163}
164
165#[derive(Debug, Error)]
167pub enum BackupError {
168 #[error(transparent)]
170 Io(#[from] std::io::Error),
171 #[error(transparent)]
173 Provider(#[from] reth_provider::ProviderError),
174}
175
176#[derive(Debug)]
178pub struct BackupHandle {
179 pub sender: Sender<BackupAction>,
181 pub rx: Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant)>,
183 pub latest_backup_block: BlockNumHash,
185}
186
187impl BackupHandle {
188 pub fn new(sender: Sender<BackupAction>) -> Self {
190 Self { sender, rx: None, latest_backup_block: BlockNumHash::default() }
191 }
192
193 pub fn spawn_service(data_dir: ChainPath<DataDirPath>) -> BackupHandle {
195 let (tx, rx) = std::sync::mpsc::channel();
196 let handle = BackupHandle::new(tx);
197
198 let service = BackupService::new(rx, data_dir);
199 std::thread::Builder::new()
200 .name("Backup Service".to_string())
201 .spawn(move || {
202 if let Err(err) = service.run() {
203 error!(target: "engine::backup", ?err, "Backup service failed");
204 }
205 })
206 .unwrap();
207
208 handle
209 }
210
211 pub fn in_progress(&self) -> bool {
213 self.rx.is_some()
214 }
215
216 pub(crate) fn start(&mut self, rx: oneshot::Receiver<Option<BlockNumHash>>) {
218 self.rx = Some((rx, Instant::now()));
219 }
220
221 pub fn finish(&mut self, block_number: BlockNumHash) {
223 self.latest_backup_block = block_number;
224 self.rx = None;
225 }
226}