1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17#[derive(Clone, Debug, Default)]
21#[non_exhaustive]
22pub struct CliRunner;
23
24impl CliRunner {
27 pub fn run_command_until_exit<F, E>(
33 self,
34 command: impl FnOnce(CliContext) -> F,
35 ) -> Result<(), E>
36 where
37 F: Future<Output = Result<(), E>>,
38 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
39 {
40 let AsyncCliRunner { context, mut task_manager, tokio_runtime } = AsyncCliRunner::new()?;
41
42 let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
44 &mut task_manager,
45 run_until_ctrl_c(command(context)),
46 ));
47
48 if command_res.is_err() {
49 error!(target: "reth::cli", "shutting down due to error");
50 } else {
51 debug!(target: "reth::cli", "shutting down gracefully");
52 task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
56 }
57
58 let (tx, rx) = mpsc::channel();
63 std::thread::Builder::new()
64 .name("tokio-runtime-shutdown".to_string())
65 .spawn(move || {
66 drop(tokio_runtime);
67 let _ = tx.send(());
68 })
69 .unwrap();
70
71 let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
72 debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
73 });
74
75 command_res
76 }
77
78 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
80 where
81 F: Future<Output = Result<(), E>>,
82 E: Send + Sync + From<std::io::Error> + 'static,
83 {
84 let tokio_runtime = tokio_runtime()?;
85 tokio_runtime.block_on(run_until_ctrl_c(fut))?;
86 Ok(())
87 }
88
89 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
94 where
95 F: Future<Output = Result<(), E>> + Send + 'static,
96 E: Send + Sync + From<std::io::Error> + 'static,
97 {
98 let tokio_runtime = tokio_runtime()?;
99 let handle = tokio_runtime.handle().clone();
100 let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
101 tokio_runtime
102 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
103
104 std::thread::Builder::new()
108 .name("tokio-runtime-shutdown".to_string())
109 .spawn(move || drop(tokio_runtime))
110 .unwrap();
111
112 Ok(())
113 }
114}
115
116struct AsyncCliRunner {
118 context: CliContext,
119 task_manager: TaskManager,
120 tokio_runtime: tokio::runtime::Runtime,
121}
122
123impl AsyncCliRunner {
126 fn new() -> Result<Self, std::io::Error> {
129 let tokio_runtime = tokio_runtime()?;
130 let task_manager = TaskManager::new(tokio_runtime.handle().clone());
131 let task_executor = task_manager.executor();
132 Ok(Self { context: CliContext { task_executor }, task_manager, tokio_runtime })
133 }
134}
135
136#[derive(Debug)]
138pub struct CliContext {
139 pub task_executor: TaskExecutor,
141}
142
143pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
146 tokio::runtime::Builder::new_multi_thread().enable_all().build()
147}
148
149async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
153where
154 F: Future<Output = Result<(), E>>,
155 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
156{
157 {
158 let fut = pin!(fut);
159 tokio::select! {
160 err = tasks => {
161 return Err(err.into())
162 },
163 res = fut => res?,
164 }
165 }
166 Ok(())
167}
168
169async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
173where
174 F: Future<Output = Result<(), E>>,
175 E: Send + Sync + 'static + From<std::io::Error>,
176{
177 let ctrl_c = tokio::signal::ctrl_c();
178
179 #[cfg(unix)]
180 {
181 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
182 let sigterm = stream.recv();
183 let sigterm = pin!(sigterm);
184 let ctrl_c = pin!(ctrl_c);
185 let fut = pin!(fut);
186
187 tokio::select! {
188 _ = ctrl_c => {
189 trace!(target: "reth::cli", "Received ctrl-c");
190 },
191 _ = sigterm => {
192 trace!(target: "reth::cli", "Received SIGTERM");
193 },
194 res = fut => res?,
195 }
196 }
197
198 #[cfg(not(unix))]
199 {
200 let ctrl_c = pin!(ctrl_c);
201 let fut = pin!(fut);
202
203 tokio::select! {
204 _ = ctrl_c => {
205 trace!(target: "reth::cli", "Received ctrl-c");
206 },
207 res = fut => res?,
208 }
209 }
210
211 Ok(())
212}