reth_db/
lockfile.rs

1//! Storage lock utils.
2
3#![cfg_attr(feature = "disable-lock", allow(dead_code))]
4
5use reth_storage_errors::lockfile::StorageLockError;
6use reth_tracing::tracing::error;
7use std::{
8    path::{Path, PathBuf},
9    process,
10    sync::{Arc, OnceLock},
11};
12use sysinfo::{ProcessRefreshKind, RefreshKind, System};
13
14/// File lock name.
15const LOCKFILE_NAME: &str = "lock";
16
17/// A file lock for a storage directory to ensure exclusive read-write access across different
18/// processes.
19///
20/// This lock stores the PID of the process holding it and is released (deleted) on a graceful
21/// shutdown. On resuming from a crash, the stored PID helps verify that no other process holds the
22/// lock.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct StorageLock(Arc<StorageLockInner>);
25
26impl StorageLock {
27    /// Tries to acquire a write lock on the target directory, returning [`StorageLockError`] if
28    /// unsuccessful.
29    ///
30    /// Note: In-process exclusivity is not on scope. If called from the same process (or another
31    /// with the same PID), it will succeed.
32    pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
33        #[cfg(feature = "disable-lock")]
34        {
35            let file_path = path.join(LOCKFILE_NAME);
36            // Too expensive for ef-tests to write/read lock to/from disk.
37            Ok(Self(Arc::new(StorageLockInner { file_path })))
38        }
39
40        #[cfg(not(feature = "disable-lock"))]
41        Self::try_acquire_file_lock(path)
42    }
43
44    /// Acquire a file write lock.
45    #[cfg(any(test, not(feature = "disable-lock")))]
46    fn try_acquire_file_lock(path: &Path) -> Result<Self, StorageLockError> {
47        let file_path = path.join(LOCKFILE_NAME);
48        if let Some(process_lock) = ProcessUID::parse(&file_path)? {
49            if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
50                error!(
51                    target: "reth::db::lockfile",
52                    path = ?file_path,
53                    pid = process_lock.pid,
54                    start_time = process_lock.start_time,
55                    "Storage lock already taken."
56                );
57                return Err(StorageLockError::Taken(process_lock.pid))
58            }
59        }
60
61        Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
62    }
63}
64
65impl Drop for StorageLock {
66    fn drop(&mut self) {
67        if Arc::strong_count(&self.0) == 1 && self.0.file_path.exists() {
68            // TODO: should only happen during tests that the file does not exist: tempdir is
69            // getting dropped first. However, tempdir shouldn't be dropped
70            // before any of the storage providers.
71            if let Err(err) = reth_fs_util::remove_file(&self.0.file_path) {
72                error!(%err, "Failed to delete lock file");
73            }
74        }
75    }
76}
77
78#[derive(Debug, PartialEq, Eq)]
79struct StorageLockInner {
80    file_path: PathBuf,
81}
82
83impl StorageLockInner {
84    /// Creates lock file and writes this process PID into it.
85    fn new(file_path: PathBuf) -> Result<Self, StorageLockError> {
86        // Create the directory if it doesn't exist
87        if let Some(parent) = file_path.parent() {
88            reth_fs_util::create_dir_all(parent)?;
89        }
90
91        // Write this process unique identifier (pid & start_time) to file
92        ProcessUID::own().write(&file_path)?;
93
94        Ok(Self { file_path })
95    }
96}
97
98#[derive(Clone, Debug)]
99struct ProcessUID {
100    /// OS process identifier
101    pid: usize,
102    /// Process start time
103    start_time: u64,
104}
105
106impl ProcessUID {
107    /// Creates [`Self`] for the provided PID.
108    fn new(pid: usize) -> Option<Self> {
109        let mut system = System::new();
110        let pid2 = sysinfo::Pid::from(pid);
111        system.refresh_processes_specifics(
112            sysinfo::ProcessesToUpdate::Some(&[pid2]),
113            true,
114            ProcessRefreshKind::new(),
115        );
116        system.process(pid2).map(|process| Self { pid, start_time: process.start_time() })
117    }
118
119    /// Creates [`Self`] from own process.
120    fn own() -> Self {
121        static CACHE: OnceLock<ProcessUID> = OnceLock::new();
122        CACHE.get_or_init(|| Self::new(process::id() as usize).expect("own process")).clone()
123    }
124
125    /// Parses [`Self`] from a file.
126    fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
127        if path.exists() {
128            if let Ok(contents) = reth_fs_util::read_to_string(path) {
129                let mut lines = contents.lines();
130                if let (Some(Ok(pid)), Some(Ok(start_time))) = (
131                    lines.next().map(str::trim).map(str::parse),
132                    lines.next().map(str::trim).map(str::parse),
133                ) {
134                    return Ok(Some(Self { pid, start_time }));
135                }
136            }
137        }
138        Ok(None)
139    }
140
141    /// Whether a process with this `pid` and `start_time` exists.
142    fn is_active(&self) -> bool {
143        System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()))
144            .process(self.pid.into())
145            .is_some_and(|p| p.start_time() == self.start_time)
146    }
147
148    /// Writes `pid` and `start_time` to a file.
149    fn write(&self, path: &Path) -> Result<(), StorageLockError> {
150        Ok(reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))?)
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use std::sync::{Mutex, MutexGuard, OnceLock};
158
159    // helper to ensure some tests are run serially
160    static SERIAL: OnceLock<Mutex<()>> = OnceLock::new();
161
162    fn serial_lock() -> MutexGuard<'static, ()> {
163        SERIAL.get_or_init(|| Mutex::new(())).lock().unwrap()
164    }
165
166    #[test]
167    fn test_lock() {
168        let _guard = serial_lock();
169
170        let temp_dir = tempfile::tempdir().unwrap();
171
172        let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
173
174        // Same process can re-acquire the lock
175        assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
176
177        // A lock of a non existent PID can be acquired.
178        let lock_file = temp_dir.path().join(LOCKFILE_NAME);
179        let mut fake_pid = 1337;
180        let system = System::new_all();
181        while system.process(fake_pid.into()).is_some() {
182            fake_pid += 1;
183        }
184        ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
185        assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
186
187        let mut pid_1 = ProcessUID::new(1).unwrap();
188
189        // If a parsed `ProcessUID` exists, the lock can NOT be acquired.
190        pid_1.write(&lock_file).unwrap();
191        assert_eq!(
192            Err(StorageLockError::Taken(1)),
193            StorageLock::try_acquire_file_lock(temp_dir.path())
194        );
195
196        // A lock of a different but existing PID can be acquired ONLY IF the start_time differs.
197        pid_1.start_time += 1;
198        pid_1.write(&lock_file).unwrap();
199        assert_eq!(Ok(lock), StorageLock::try_acquire_file_lock(temp_dir.path()));
200    }
201
202    #[test]
203    fn test_drop_lock() {
204        let _guard = serial_lock();
205
206        let temp_dir = tempfile::tempdir().unwrap();
207        let lock_file = temp_dir.path().join(LOCKFILE_NAME);
208
209        let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
210
211        assert!(lock_file.exists());
212        drop(lock);
213        assert!(!lock_file.exists());
214    }
215}