Skip to content

Commit

Permalink
Provider process termination on Ctrl-C (Windows bugfix)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwalski committed Jan 4, 2024
1 parent ec4f0a0 commit f23cc80
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 83 deletions.
6 changes: 4 additions & 2 deletions agent/provider/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use actix::Actor;
use std::env;
use structopt::{clap, StructOpt};
use ya_provider::signal::SignalMonitor;

use ya_provider::provider_agent::{Initialize, ProviderAgent, Shutdown};
use ya_provider::signal::SignalMonitor;
use ya_provider::startup_config::{Commands, StartupConfig};
use ya_utils_process::lock::ProcLock;

Expand Down Expand Up @@ -35,9 +35,11 @@ async fn main() -> anyhow::Result<()> {
let agent = ProviderAgent::new(args, config).await?.start();
agent.send(Initialize).await??;

let (_, signal) = SignalMonitor::default().await;
let signal = SignalMonitor::new().recv().await?;
log::info!("{} received, Shutting down {}...", signal, app_name);

agent.send(Shutdown).await??;

Ok(())
}
Commands::Config(config_cmd) => config_cmd.run(config),
Expand Down
126 changes: 45 additions & 81 deletions agent/provider/src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,98 +1,62 @@
use futures::channel::mpsc;
use futures::{Future, SinkExt, Stream};
use futures_util::task::{Context, Poll};
use std::pin::Pin;

use signal_hook::{
consts::*,
low_level::{register, unregister},
SigId,
pub(crate) type Signal = &'static str;

use tokio::task::JoinHandle;
use tokio::{
select,
sync::{
oneshot,
oneshot::{Receiver, Sender},
},
};

pub(crate) type Signal = (i32, &'static str);
#[cfg(target_family = "unix")]
use tokio::signal::unix;
#[cfg(target_family = "windows")]
use tokio::signal::windows;

pub struct SignalMonitor {
rx: mpsc::Receiver<Signal>,
hooks: Vec<SigId>,
stop_tx: Sender<Signal>,
stop_rx: Receiver<Signal>,
}

impl SignalMonitor {
pub fn new(signals: Vec<i32>) -> Self {
let (tx, rx) = mpsc::channel(1);
let hooks = signals
.into_iter()
.map(|s| register_signal(tx.clone(), s))
.collect();

SignalMonitor { rx, hooks }
}
}

impl Default for SignalMonitor {
fn default() -> Self {
#[allow(unused)]
let mut signals = vec![SIGABRT, SIGINT, SIGTERM];

#[cfg(not(windows))]
signals.push(SIGQUIT);

Self::new(signals)
pub fn new() -> Self {

Check failure on line 23 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Check formatting

you should consider adding a `Default` implementation for `SignalMonitor`
let (stop_tx, stop_rx) = oneshot::channel();
Self { stop_tx, stop_rx }
}
}

impl Future for SignalMonitor {
type Output = Signal;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(Some(s)) => Poll::Ready(s),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
pub async fn recv(self) -> anyhow::Result<Signal> {
Self::start(self.stop_tx)?;

Check failure on line 29 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

mismatched types
Ok(self.stop_rx.await?)
}
}

impl Drop for SignalMonitor {
fn drop(&mut self) {
std::mem::take(&mut self.hooks).into_iter().for_each(|s| {
unregister(s);
});
#[cfg(target_family = "unix")]
fn start(stop_tx: Sender<Signal>) -> anyhow::Result<JoinHandle<()>> {
let mut sigterm = unix::signal(unix::SignalKind::terminate())?;
let mut sigint = unix::signal(unix::SignalKind::interrupt())?;
let mut sigquit = unix::signal(unix::SignalKind::quit())?;
Ok(tokio::spawn(async move {
select! {
_ = sigterm.recv() => stop_tx.send("SIGTERM").expect("Failed to handle SIGTERM event"),
_ = sigint.recv() => stop_tx.send("SIGINT").expect("Failed to handle SIGINT event"),
_ = sigquit.recv() => stop_tx.send("SIGQUIT").expect("Failed to handle SIGQUIT event"),
};
}))
}
}

fn register_signal(tx: mpsc::Sender<Signal>, signal: i32) -> SigId {
log::trace!("Registering signal {} ({})", signal_to_str(signal), signal);

let action = move || {
let mut tx = tx.clone();
#[cfg(target_family = "windows")]
fn start(stop_tx: Receiver<Signal>) -> anyhow::Result<JoinHandle<()>> {

Check failure on line 48 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

mismatched types
let mut ctrl_c = windows::ctrl_c()?;
let mut ctrl_close = windows::ctrl_close()?;
let mut ctrl_logoff = windows::ctrl_logoff()?;
let mut ctrl_shutdown = windows::ctrl_shutdown()?;
tokio::spawn(async move {
let signal_pair = (signal, signal_to_str(signal));
if let Err(e) = tx.send(signal_pair).await {
log::error!("Unable to notify about signal {:?}: {}", signal_pair, e);
}
select! {
_ = ctrl_c.recv() => stop_tx.send("CTRL-C").expect("Failed to handle CTRL-C event"),

Check failure on line 55 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

no method named `send` found for struct `tokio::sync::oneshot::Receiver` in the current scope
_ = ctrl_close.recv() => stop_tx.send("CTRL-CLOSE").expect("Failed to handle CTRL-CLOSE event"),

Check failure on line 56 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

no method named `send` found for struct `tokio::sync::oneshot::Receiver` in the current scope
_ = ctrl_logoff.recv() => stop_tx.send("CTRL-LOGOFF").expect("Failed to handle CTRL-LOGOFF event"),

Check failure on line 57 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

no method named `send` found for struct `tokio::sync::oneshot::Receiver` in the current scope
_ = ctrl_shutdown.recv() => stop_tx.send("CTRL-SHUTDOWN").expect("Failed to handle SHUTDOWN event"),

Check failure on line 58 in agent/provider/src/signal.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

no method named `send` found for struct `tokio::sync::oneshot::Receiver` in the current scope
};
});
};

unsafe { register(signal, action) }.unwrap()
}

fn signal_to_str(signal: i32) -> &'static str {
match signal {
#[cfg(not(windows))]
SIGHUP => "SIGHUP",
#[cfg(not(windows))]
SIGQUIT => "SIGQUIT",
#[cfg(not(windows))]
SIGKILL => "SIGKILL",
#[cfg(not(windows))]
SIGPIPE => "SIGPIPE",
#[cfg(not(windows))]
SIGALRM => "SIGALRM",
SIGINT => "SIGINT",
SIGILL => "SIGILL",
SIGABRT => "SIGABRT",
SIGFPE => "SIGFPE",
SIGSEGV => "SIGSEGV",
SIGTERM => "SIGTERM",
_ => "SIG?",
}
}

0 comments on commit f23cc80

Please sign in to comment.