Skip to content

Commit 14004e9

Browse files
authored
chore(core): Expose shutdown errors (#18153)
* Refactor app signal handling into functions * Introduce `ShutdownError` to pass along the cause of a shutdown * Add handling of component shutdown errors * Add component name to shutdown errors * Add error message to component errors * Add API error message * Add snafu-based display messages for the shutdown errors * Fix broken dep in Cargo.lock * Fix Windows compile error
1 parent 0c1cf23 commit 14004e9

File tree

9 files changed

+174
-107
lines changed

9 files changed

+174
-107
lines changed

src/app.rs

+97-80
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
2828
config::{self, Config, ConfigPath},
2929
heartbeat,
30-
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
30+
signal::{ShutdownError, SignalHandler, SignalPair, SignalRx, SignalTo},
3131
topology::{
3232
self, ReloadOutcome, RunningTopology, SharedTopologyController, TopologyController,
3333
},
@@ -49,8 +49,8 @@ use tokio::sync::broadcast::error::RecvError;
4949
pub struct ApplicationConfig {
5050
pub config_paths: Vec<config::ConfigPath>,
5151
pub topology: RunningTopology,
52-
pub graceful_crash_sender: mpsc::UnboundedSender<()>,
53-
pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>,
52+
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
53+
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
5454
#[cfg(feature = "api")]
5555
pub api: config::api::Options,
5656
#[cfg(feature = "enterprise")]
@@ -139,9 +139,12 @@ impl ApplicationConfig {
139139

140140
Some(api_server)
141141
}
142-
Err(e) => {
143-
error!("An error occurred that Vector couldn't handle: {}.", e);
144-
_ = self.graceful_crash_sender.send(());
142+
Err(error) => {
143+
let error = error.to_string();
144+
error!("An error occurred that Vector couldn't handle: {}.", error);
145+
_ = self
146+
.graceful_crash_sender
147+
.send(ShutdownError::ApiFailed { error });
145148
None
146149
}
147150
}
@@ -245,7 +248,7 @@ impl Application {
245248

246249
pub struct StartedApplication {
247250
pub config_paths: Vec<ConfigPath>,
248-
pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>,
251+
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
249252
pub signals: SignalPair,
250253
pub topology_controller: SharedTopologyController,
251254
}
@@ -270,42 +273,19 @@ impl StartedApplication {
270273

271274
let signal = loop {
272275
tokio::select! {
273-
signal = signal_rx.recv() => {
274-
match signal {
275-
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
276-
let mut topology_controller = topology_controller.lock().await;
277-
let new_config = config_builder.build().map_err(handle_config_errors).ok();
278-
if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await {
279-
break SignalTo::Shutdown;
280-
}
281-
}
282-
Ok(SignalTo::ReloadFromDisk) => {
283-
let mut topology_controller = topology_controller.lock().await;
284-
285-
// Reload paths
286-
if let Some(paths) = config::process_paths(&config_paths) {
287-
topology_controller.config_paths = paths;
288-
}
289-
290-
// Reload config
291-
let new_config = config::load_from_paths_with_provider_and_secrets(&topology_controller.config_paths, &mut signal_handler)
292-
.await
293-
.map_err(handle_config_errors).ok();
294-
295-
if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await {
296-
break SignalTo::Shutdown;
297-
}
298-
},
299-
Err(RecvError::Lagged(amt)) => warn!("Overflow, dropped {} signals.", amt),
300-
Err(RecvError::Closed) => break SignalTo::Shutdown,
301-
Ok(signal) => break signal,
302-
}
303-
}
276+
signal = signal_rx.recv() => if let Some(signal) = handle_signal(
277+
signal,
278+
&topology_controller,
279+
&config_paths,
280+
&mut signal_handler,
281+
).await {
282+
break signal;
283+
},
304284
// Trigger graceful shutdown if a component crashed, or all sources have ended.
305-
_ = graceful_crash.next() => break SignalTo::Shutdown,
285+
error = graceful_crash.next() => break SignalTo::Shutdown(error),
306286
_ = TopologyController::sources_finished(topology_controller.clone()) => {
307287
info!("All sources have finished.");
308-
break SignalTo::Shutdown
288+
break SignalTo::Shutdown(None)
309289
} ,
310290
else => unreachable!("Signal streams never end"),
311291
}
@@ -319,6 +299,53 @@ impl StartedApplication {
319299
}
320300
}
321301

302+
async fn handle_signal(
303+
signal: Result<SignalTo, RecvError>,
304+
topology_controller: &SharedTopologyController,
305+
config_paths: &[ConfigPath],
306+
signal_handler: &mut SignalHandler,
307+
) -> Option<SignalTo> {
308+
match signal {
309+
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
310+
let mut topology_controller = topology_controller.lock().await;
311+
let new_config = config_builder.build().map_err(handle_config_errors).ok();
312+
match topology_controller.reload(new_config).await {
313+
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
314+
_ => None,
315+
}
316+
}
317+
Ok(SignalTo::ReloadFromDisk) => {
318+
let mut topology_controller = topology_controller.lock().await;
319+
320+
// Reload paths
321+
if let Some(paths) = config::process_paths(config_paths) {
322+
topology_controller.config_paths = paths;
323+
}
324+
325+
// Reload config
326+
let new_config = config::load_from_paths_with_provider_and_secrets(
327+
&topology_controller.config_paths,
328+
signal_handler,
329+
)
330+
.await
331+
.map_err(handle_config_errors)
332+
.ok();
333+
334+
match topology_controller.reload(new_config).await {
335+
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
336+
_ => None,
337+
}
338+
}
339+
Err(RecvError::Lagged(amt)) => {
340+
warn!("Overflow, dropped {} signals.", amt);
341+
None
342+
}
343+
Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
344+
Ok(signal) => Some(signal),
345+
}
346+
}
347+
348+
#[derive(Debug)]
322349
pub struct FinishedApplication {
323350
pub signal: SignalTo,
324351
pub signal_rx: SignalRx,
@@ -329,7 +356,7 @@ impl FinishedApplication {
329356
pub async fn shutdown(self) -> ExitStatus {
330357
let FinishedApplication {
331358
signal,
332-
mut signal_rx,
359+
signal_rx,
333360
topology_controller,
334361
} = self;
335362

@@ -341,49 +368,39 @@ impl FinishedApplication {
341368
.into_inner();
342369

343370
match signal {
344-
SignalTo::Shutdown => {
345-
emit!(VectorStopped);
346-
tokio::select! {
347-
_ = topology_controller.stop() => ExitStatus::from_raw({
348-
#[cfg(windows)]
349-
{
350-
exitcode::OK as u32
351-
}
352-
#[cfg(unix)]
353-
exitcode::OK
354-
}), // Graceful shutdown finished
355-
_ = signal_rx.recv() => {
356-
// It is highly unlikely that this event will exit from topology.
357-
emit!(VectorQuit);
358-
// Dropping the shutdown future will immediately shut the server down
359-
ExitStatus::from_raw({
360-
#[cfg(windows)]
361-
{
362-
exitcode::UNAVAILABLE as u32
363-
}
364-
#[cfg(unix)]
365-
exitcode::OK
366-
})
367-
}
371+
SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
372+
SignalTo::Quit => Self::quit(),
373+
_ => unreachable!(),
374+
}
375+
}
368376

377+
async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
378+
emit!(VectorStopped);
379+
tokio::select! {
380+
_ = topology_controller.stop() => ExitStatus::from_raw({
381+
#[cfg(windows)]
382+
{
383+
exitcode::OK as u32
369384
}
370-
}
371-
SignalTo::Quit => {
372-
// It is highly unlikely that this event will exit from topology.
373-
emit!(VectorQuit);
374-
drop(topology_controller);
375-
ExitStatus::from_raw({
376-
#[cfg(windows)]
377-
{
378-
exitcode::UNAVAILABLE as u32
379-
}
380-
#[cfg(unix)]
381-
exitcode::OK
382-
})
383-
}
384-
_ => unreachable!(),
385+
#[cfg(unix)]
386+
exitcode::OK
387+
}), // Graceful shutdown finished
388+
_ = signal_rx.recv() => Self::quit(),
385389
}
386390
}
391+
392+
fn quit() -> ExitStatus {
393+
// It is highly unlikely that this event will exit from topology.
394+
emit!(VectorQuit);
395+
ExitStatus::from_raw({
396+
#[cfg(windows)]
397+
{
398+
exitcode::UNAVAILABLE as u32
399+
}
400+
#[cfg(unix)]
401+
exitcode::OK
402+
})
403+
}
387404
}
388405

389406
pub fn init_global() {

src/secrets/exec.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async fn query_backend(
134134
loop {
135135
tokio::select! {
136136
biased;
137-
Ok(signal::SignalTo::Shutdown | signal::SignalTo::Quit) = signal_rx.recv() => {
137+
Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
138138
drop(command);
139139
return Err("Secret retrieval was interrupted.".into());
140140
}

src/signal.rs

+22-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#![allow(missing_docs)]
22

3+
use snafu::Snafu;
34
use tokio::{runtime::Runtime, sync::broadcast};
45
use tokio_stream::{Stream, StreamExt};
56

6-
use super::config::ConfigBuilder;
7+
use super::config::{ComponentKey, ConfigBuilder};
78

89
pub type ShutdownTx = broadcast::Sender<()>;
910
pub type SignalTx = broadcast::Sender<SignalTo>;
@@ -18,11 +19,27 @@ pub enum SignalTo {
1819
/// Signal to reload config from the filesystem.
1920
ReloadFromDisk,
2021
/// Signal to shutdown process.
21-
Shutdown,
22+
Shutdown(Option<ShutdownError>),
2223
/// Shutdown process immediately.
2324
Quit,
2425
}
2526

27+
#[derive(Clone, Debug, Snafu)]
28+
pub enum ShutdownError {
29+
// For future work: It would be nice if we could keep the actual errors in here, but
30+
// `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky.
31+
#[snafu(display("The API failed to start: {error}"))]
32+
ApiFailed { error: String },
33+
#[snafu(display("Reload failed, and then failed to restore the previous config"))]
34+
ReloadFailedToRestore,
35+
#[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
36+
SourceAborted { key: ComponentKey, error: String },
37+
#[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
38+
TransformAborted { key: ComponentKey, error: String },
39+
#[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
40+
SinkAborted { key: ComponentKey, error: String },
41+
}
42+
2643
/// Convenience struct for app setup handling.
2744
pub struct SignalPair {
2845
pub handler: SignalHandler,
@@ -153,11 +170,11 @@ fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> {
153170
let signal = tokio::select! {
154171
_ = sigint.recv() => {
155172
info!(message = "Signal received.", signal = "SIGINT");
156-
SignalTo::Shutdown
173+
SignalTo::Shutdown(None)
157174
},
158175
_ = sigterm.recv() => {
159176
info!(message = "Signal received.", signal = "SIGTERM");
160-
SignalTo::Shutdown
177+
SignalTo::Shutdown(None)
161178
} ,
162179
_ = sigquit.recv() => {
163180
info!(message = "Signal received.", signal = "SIGQUIT");
@@ -181,7 +198,7 @@ fn os_signals(_: &Runtime) -> impl Stream<Item = SignalTo> {
181198

182199
async_stream::stream! {
183200
loop {
184-
let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown).await;
201+
let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
185202
yield signal;
186203
}
187204
}

src/sinks/datadog/traces/apm_stats/integration_tests.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tokio::time::{sleep, Duration};
1616

1717
use crate::{
1818
config::ConfigBuilder,
19+
signal::ShutdownError,
1920
sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig},
2021
sources::datadog_agent::DatadogAgentConfig,
2122
test_util::{start_topology, trace_init},
@@ -322,8 +323,8 @@ fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) {
322323
async fn start_vector() -> (
323324
RunningTopology,
324325
(
325-
tokio::sync::mpsc::UnboundedSender<()>,
326-
tokio::sync::mpsc::UnboundedReceiver<()>,
326+
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
327+
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
327328
),
328329
) {
329330
let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port());

src/tap/cmd.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitC
5353
loop {
5454
tokio::select! {
5555
biased;
56-
Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break,
56+
Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
5757
status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
5858
if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect {
5959
#[allow(clippy::print_stderr)]

src/test_util/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use zstd::Decoder as ZstdDecoder;
4141

4242
use crate::{
4343
config::{Config, ConfigDiff, GenerateConfig},
44+
signal::ShutdownError,
4445
topology::{self, RunningTopology},
4546
trace,
4647
};
@@ -683,8 +684,8 @@ pub async fn start_topology(
683684
) -> (
684685
RunningTopology,
685686
(
686-
tokio::sync::mpsc::UnboundedSender<()>,
687-
tokio::sync::mpsc::UnboundedReceiver<()>,
687+
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
688+
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
688689
),
689690
) {
690691
config.healthchecks.set_require_healthy(require_healthy);

0 commit comments

Comments
 (0)