1#![cfg_attr(feature = "disable-lock", allow(dead_code))]
4
5use reth_storage_errors::lockfile::StorageLockError;
6use reth_tracing::tracing::error;
7use std::{
8 path::{Path, PathBuf},
9 process,
10 sync::{Arc, OnceLock},
11};
12use sysinfo::{ProcessRefreshKind, RefreshKind, System};
13
14const LOCKFILE_NAME: &str = "lock";
16
17#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StorageLock(Arc<StorageLockInner>);
25
26impl StorageLock {
27 pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
33 #[cfg(feature = "disable-lock")]
34 {
35 let file_path = path.join(LOCKFILE_NAME);
36 Ok(Self(Arc::new(StorageLockInner { file_path })))
38 }
39
40 #[cfg(not(feature = "disable-lock"))]
41 Self::try_acquire_file_lock(path)
42 }
43
44 #[cfg(any(test, not(feature = "disable-lock")))]
46 fn try_acquire_file_lock(path: &Path) -> Result<Self, StorageLockError> {
47 let file_path = path.join(LOCKFILE_NAME);
48 if let Some(process_lock) = ProcessUID::parse(&file_path)? {
49 if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
50 error!(
51 target: "reth::db::lockfile",
52 path = ?file_path,
53 pid = process_lock.pid,
54 start_time = process_lock.start_time,
55 "Storage lock already taken."
56 );
57 return Err(StorageLockError::Taken(process_lock.pid))
58 }
59 }
60
61 Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
62 }
63}
64
65impl Drop for StorageLock {
66 fn drop(&mut self) {
67 if Arc::strong_count(&self.0) == 1 && self.0.file_path.exists() {
68 if let Err(err) = reth_fs_util::remove_file(&self.0.file_path) {
72 error!(%err, "Failed to delete lock file");
73 }
74 }
75 }
76}
77
78#[derive(Debug, PartialEq, Eq)]
79struct StorageLockInner {
80 file_path: PathBuf,
81}
82
83impl StorageLockInner {
84 fn new(file_path: PathBuf) -> Result<Self, StorageLockError> {
86 if let Some(parent) = file_path.parent() {
88 reth_fs_util::create_dir_all(parent)?;
89 }
90
91 ProcessUID::own().write(&file_path)?;
93
94 Ok(Self { file_path })
95 }
96}
97
98#[derive(Clone, Debug)]
99struct ProcessUID {
100 pid: usize,
102 start_time: u64,
104}
105
106impl ProcessUID {
107 fn new(pid: usize) -> Option<Self> {
109 let mut system = System::new();
110 let pid2 = sysinfo::Pid::from(pid);
111 system.refresh_processes_specifics(
112 sysinfo::ProcessesToUpdate::Some(&[pid2]),
113 true,
114 ProcessRefreshKind::new(),
115 );
116 system.process(pid2).map(|process| Self { pid, start_time: process.start_time() })
117 }
118
119 fn own() -> Self {
121 static CACHE: OnceLock<ProcessUID> = OnceLock::new();
122 CACHE.get_or_init(|| Self::new(process::id() as usize).expect("own process")).clone()
123 }
124
125 fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
127 if path.exists() {
128 if let Ok(contents) = reth_fs_util::read_to_string(path) {
129 let mut lines = contents.lines();
130 if let (Some(Ok(pid)), Some(Ok(start_time))) = (
131 lines.next().map(str::trim).map(str::parse),
132 lines.next().map(str::trim).map(str::parse),
133 ) {
134 return Ok(Some(Self { pid, start_time }));
135 }
136 }
137 }
138 Ok(None)
139 }
140
141 fn is_active(&self) -> bool {
143 System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()))
144 .process(self.pid.into())
145 .is_some_and(|p| p.start_time() == self.start_time)
146 }
147
148 fn write(&self, path: &Path) -> Result<(), StorageLockError> {
150 Ok(reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))?)
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use std::sync::{Mutex, MutexGuard, OnceLock};
158
159 static SERIAL: OnceLock<Mutex<()>> = OnceLock::new();
161
162 fn serial_lock() -> MutexGuard<'static, ()> {
163 SERIAL.get_or_init(|| Mutex::new(())).lock().unwrap()
164 }
165
166 #[test]
167 fn test_lock() {
168 let _guard = serial_lock();
169
170 let temp_dir = tempfile::tempdir().unwrap();
171
172 let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
173
174 assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
176
177 let lock_file = temp_dir.path().join(LOCKFILE_NAME);
179 let mut fake_pid = 1337;
180 let system = System::new_all();
181 while system.process(fake_pid.into()).is_some() {
182 fake_pid += 1;
183 }
184 ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
185 assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
186
187 let mut pid_1 = ProcessUID::new(1).unwrap();
188
189 pid_1.write(&lock_file).unwrap();
191 assert_eq!(
192 Err(StorageLockError::Taken(1)),
193 StorageLock::try_acquire_file_lock(temp_dir.path())
194 );
195
196 pid_1.start_time += 1;
198 pid_1.write(&lock_file).unwrap();
199 assert_eq!(Ok(lock), StorageLock::try_acquire_file_lock(temp_dir.path()));
200 }
201
202 #[test]
203 fn test_drop_lock() {
204 let _guard = serial_lock();
205
206 let temp_dir = tempfile::tempdir().unwrap();
207 let lock_file = temp_dir.path().join(LOCKFILE_NAME);
208
209 let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
210
211 assert!(lock_file.exists());
212 drop(lock);
213 assert!(!lock_file.exists());
214 }
215}