reth_cli_runner/
lib.rs

1//! A tokio based CLI runner.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11//! Entrypoint for running commands.
12
13use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17/// Executes CLI commands.
18///
19/// Provides utilities for running a cli command to completion.
20#[derive(Clone, Debug, Default)]
21#[non_exhaustive]
22pub struct CliRunner;
23
24// === impl CliRunner ===
25
26impl CliRunner {
27    /// Executes the given _async_ command on the tokio runtime until the command future resolves or
28    /// until the process receives a `SIGINT` or `SIGTERM` signal.
29    ///
30    /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
31    /// to drive their shutdown to completion after the command has finished.
32    pub fn run_command_until_exit<F, E>(
33        self,
34        command: impl FnOnce(CliContext) -> F,
35    ) -> Result<(), E>
36    where
37        F: Future<Output = Result<(), E>>,
38        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
39    {
40        let AsyncCliRunner { context, mut task_manager, tokio_runtime } = AsyncCliRunner::new()?;
41
42        // Executes the command until it finished or ctrl-c was fired
43        let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
44            &mut task_manager,
45            run_until_ctrl_c(command(context)),
46        ));
47
48        if command_res.is_err() {
49            error!(target: "reth::cli", "shutting down due to error");
50        } else {
51            debug!(target: "reth::cli", "shutting down gracefully");
52            // after the command has finished or exit signal was received we shutdown the task
53            // manager which fires the shutdown signal to all tasks spawned via the task
54            // executor and awaiting on tasks spawned with graceful shutdown
55            task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
56        }
57
58        // `drop(tokio_runtime)` would block the current thread until its pools
59        // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
60        // it on a separate thread and wait for up to 5 seconds for this operation to
61        // complete.
62        let (tx, rx) = mpsc::channel();
63        std::thread::Builder::new()
64            .name("tokio-runtime-shutdown".to_string())
65            .spawn(move || {
66                drop(tokio_runtime);
67                let _ = tx.send(());
68            })
69            .unwrap();
70
71        let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
72            debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
73        });
74
75        command_res
76    }
77
78    /// Executes a regular future until completion or until external signal received.
79    pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
80    where
81        F: Future<Output = Result<(), E>>,
82        E: Send + Sync + From<std::io::Error> + 'static,
83    {
84        let tokio_runtime = tokio_runtime()?;
85        tokio_runtime.block_on(run_until_ctrl_c(fut))?;
86        Ok(())
87    }
88
89    /// Executes a regular future as a spawned blocking task until completion or until external
90    /// signal received.
91    ///
92    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
93    pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
94    where
95        F: Future<Output = Result<(), E>> + Send + 'static,
96        E: Send + Sync + From<std::io::Error> + 'static,
97    {
98        let tokio_runtime = tokio_runtime()?;
99        let handle = tokio_runtime.handle().clone();
100        let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
101        tokio_runtime
102            .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
103
104        // drop the tokio runtime on a separate thread because drop blocks until its pools
105        // (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
106        // the current thread but we want to exit right away.
107        std::thread::Builder::new()
108            .name("tokio-runtime-shutdown".to_string())
109            .spawn(move || drop(tokio_runtime))
110            .unwrap();
111
112        Ok(())
113    }
114}
115
116/// [`CliRunner`] configuration when executing commands asynchronously
117struct AsyncCliRunner {
118    context: CliContext,
119    task_manager: TaskManager,
120    tokio_runtime: tokio::runtime::Runtime,
121}
122
123// === impl AsyncCliRunner ===
124
125impl AsyncCliRunner {
126    /// Attempts to create a tokio Runtime and additional context required to execute commands
127    /// asynchronously.
128    fn new() -> Result<Self, std::io::Error> {
129        let tokio_runtime = tokio_runtime()?;
130        let task_manager = TaskManager::new(tokio_runtime.handle().clone());
131        let task_executor = task_manager.executor();
132        Ok(Self { context: CliContext { task_executor }, task_manager, tokio_runtime })
133    }
134}
135
136/// Additional context provided by the [`CliRunner`] when executing commands
137#[derive(Debug)]
138pub struct CliContext {
139    /// Used to execute/spawn tasks
140    pub task_executor: TaskExecutor,
141}
142
143/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
144/// enabled
145pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
146    tokio::runtime::Builder::new_multi_thread().enable_all().build()
147}
148
149/// Runs the given future to completion or until a critical task panicked.
150///
151/// Returns the error if a task panicked, or the given future returned an error.
152async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
153where
154    F: Future<Output = Result<(), E>>,
155    E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
156{
157    {
158        let fut = pin!(fut);
159        tokio::select! {
160            err = tasks => {
161                return Err(err.into())
162            },
163            res = fut => res?,
164        }
165    }
166    Ok(())
167}
168
169/// Runs the future to completion or until:
170/// - `ctrl-c` is received.
171/// - `SIGTERM` is received (unix only).
172async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
173where
174    F: Future<Output = Result<(), E>>,
175    E: Send + Sync + 'static + From<std::io::Error>,
176{
177    let ctrl_c = tokio::signal::ctrl_c();
178
179    #[cfg(unix)]
180    {
181        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
182        let sigterm = stream.recv();
183        let sigterm = pin!(sigterm);
184        let ctrl_c = pin!(ctrl_c);
185        let fut = pin!(fut);
186
187        tokio::select! {
188            _ = ctrl_c => {
189                trace!(target: "reth::cli", "Received ctrl-c");
190            },
191            _ = sigterm => {
192                trace!(target: "reth::cli", "Received SIGTERM");
193            },
194            res = fut => res?,
195        }
196    }
197
198    #[cfg(not(unix))]
199    {
200        let ctrl_c = pin!(ctrl_c);
201        let fut = pin!(fut);
202
203        tokio::select! {
204            _ = ctrl_c => {
205                trace!(target: "reth::cli", "Received ctrl-c");
206            },
207            res = fut => res?,
208        }
209    }
210
211    Ok(())
212}