Skip to content

Commit

Permalink
Merge pull request #573 from Appva/feature/graceful-controller-shutdown
Browse files Browse the repository at this point in the history
Graceful controller shutdown
  • Loading branch information
nightkr authored Jul 2, 2021
2 parents f1b1ced + b884476 commit c84110f
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 101 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ UNRELEASED
* see https://github.com/clux/kube-rs/compare/0.57.0...master
* `kube`: `BREAKING`: subresource marker traits renamed conjugation: `Log`, `Execute`, `Attach`, `Evict` (previously `Logging`, `Executable`, `Attachable`, `Evictable`) - #536 via #560
* `kube-derive` added `#[kube(category)]` attr to set [CRD categories](https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#categories) - #559
* `kube-runtime` added `Controller::graceful_shutdown_on` for shutting down the `Controller` while waiting for running reconciliations to finish
- BREAKING: `controller::applier` now starts a graceful shutdown when the `queue` terminates
- BREAKING: `scheduler` now shuts down immediately when `requests` terminates, rather than waiting for the pending reconciliations to drain
* `kube-runtime` added tracking for reconciliation reason
- BREAKING: `Controller::owns` and `Controller::watches` now take a `dyntype` argument. If the watched type is static at compile-time then you can simply pass `()`
- BREAKING: `controller::trigger_*` now returns a `ReconcileRequest` rather than `ObjectRef`. The `ObjectRef` can be accessed via the `obj_ref` field
Expand Down
22 changes: 20 additions & 2 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use kube_runtime::controller::{Context, Controller, ReconcilerAction};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use std::collections::BTreeMap;
use std::{collections::BTreeMap, io::BufRead};
use tokio::time::Duration;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -54,6 +54,10 @@ fn object_to_owner_reference<K: Resource<DynamicType = ()>>(

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(generator: ConfigMapGenerator, ctx: Context<Data>) -> Result<ReconcilerAction, Error> {
log::info!("working hard");
tokio::time::sleep(Duration::from_secs(2)).await;
log::info!("hard work is done!");

let client = ctx.get_ref().client.clone();

let mut contents = BTreeMap::new();
Expand Down Expand Up @@ -112,8 +116,22 @@ async fn main() -> Result<()> {
let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
let cms = Api::<ConfigMap>::all(client.clone());

log::info!("starting configmapgen-controller");
log::info!("press <enter> to force a reconciliation of all objects");

let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
for _ in std::io::BufReader::new(std::io::stdin()).lines() {
let _ = reload_tx.try_send(());
}
});

Controller::new(cmgs, ListParams::default())
.owns(cms, (), ListParams::default())
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Context::new(Data { client }))
.for_each(|res| async move {
match res {
Expand All @@ -122,6 +140,6 @@ async fn main() -> Result<()> {
}
})
.await;

log::info!("controller terminated");
Ok(())
}
1 change: 0 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ serde_json = "1.0.61"
tokio = { version = "1.0.1", features = ["full", "test-util"] }
rand = "0.8.0"
schemars = "0.8.0"
tokio-stream = { version = "0.1.6", features = ["io-util"] }

[dev-dependencies.k8s-openapi]
version = "0.12.0"
Expand Down
167 changes: 152 additions & 15 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ use crate::{
ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle},
utils::{
try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle,
KubeRuntimeStreamExt,
},
watcher::{self, watcher},
};
use derivative::Derivative;
use futures::{
channel, future, stream, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream,
TryStreamExt,
channel,
future::{self, BoxFuture},
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube::api::{Api, DynamicObject, ListParams, Resource};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -237,6 +241,7 @@ where
QueueStream::Ok: Into<ReconcileRequest<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(100);
// Create a stream of ObjectRefs that need to be reconciled
Expand All @@ -247,9 +252,17 @@ where
queue.context(QueueError).map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
// 2. requests sent to scheduler_tx
scheduler_rx.map(Ok),
scheduler_rx
.map(Ok)
.take_until(scheduler_shutdown_rx)
.on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Expand Down Expand Up @@ -277,8 +290,10 @@ where
})
.context(SchedulerDequeueFailed)
.map(|res| res.and_then(|x| x))
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result)| {
let (ReconcilerAction { requeue_after }, requeue_reason) = match &reconciler_result {
Expand All @@ -293,19 +308,20 @@ where
async move {
// Transmit the requeue request to the scheduler (picked up again at top)
if let Some(delay) = requeue_after {
scheduler_tx
// Failure to schedule item = in graceful shutdown mode, ignore
let _ = scheduler_tx
.send(ScheduleRequest {
message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason},
run_at: Instant::now() + delay,
})
.await
.expect("Message could not be sent to scheduler_rx");
.await;
}
reconciler_result
.map(|action| (obj_ref, action))
.context(ReconcilerFailed)
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}

/// Controller
Expand Down Expand Up @@ -380,8 +396,15 @@ where
K::DynamicType: Eq + Hash,
{
// NB: Need to Unpin for stream::select_all
// TODO: get an arbitrary std::error::Error in here?
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
/// [`run`] starts a graceful shutdown when any of these [`Future`]s complete,
/// refusing to start any new reconciliations but letting any existing ones finish.
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
/// [`run`] terminates immediately when any of these [`Future`]s complete,
/// requesting that all running reconciliations be aborted.
/// However, note that they *will* keep running until their next yield point (`.await`),
/// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process:exit`] after `run`).
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
}
Expand Down Expand Up @@ -425,6 +448,14 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
dyntype,
reader,
}
Expand Down Expand Up @@ -500,19 +531,25 @@ where
/// To reconcile all objects when a new line is entered:
///
/// ```rust
/// # async fn foo() {
/// # async {
/// use futures::stream::StreamExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{api::ListParams, Api, Client, ResourceExt};
/// use kube_runtime::controller::{Context, Controller, ReconcilerAction};
/// use std::convert::Infallible;
/// use tokio::io::{stdin, AsyncBufReadExt, BufReader};
/// use tokio_stream::wrappers::LinesStream;
/// use std::{convert::Infallible, io::BufRead};
/// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
/// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
/// // and its worker prevents the Tokio runtime from shutting down.
/// std::thread::spawn(move || {
/// for _ in std::io::BufReader::new(std::io::stdin()).lines() {
/// let _ = reload_tx.try_send(());
/// }
/// });
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
/// ListParams::default(),
/// )
/// .reconcile_all_on(LinesStream::new(BufReader::new(stdin()).lines()).map(|_| ()))
/// .reconcile_all_on(reload_rx.map(|_| ()))
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name());
Expand All @@ -521,8 +558,12 @@ where
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
/// );
/// # }
/// # };
/// ```
///
/// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
///
/// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
let store = self.store();
let dyntype = self.dyntype.clone();
Expand All @@ -542,6 +583,100 @@ where
self
}

/// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
///
/// - No new reconciliations are started from the scheduler
/// - The underlying Kubernetes watch is terminated
/// - All running reconciliations are allowed to finish
/// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
///
/// For example, to stop the reconciler whenever the user presses Ctrl+C:
///
/// ```rust
/// # async {
/// use futures::future::FutureExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{api::ListParams, Api, Client, ResourceExt};
/// use kube_runtime::controller::{Context, Controller, ReconcilerAction};
/// use std::convert::Infallible;
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
/// ListParams::default(),
/// )
/// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name());
/// Ok(ReconcilerAction { requeue_after: None })
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
/// );
/// # };
/// ```
///
/// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
/// as soon as *any* [`Future`] resolves.
pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
self.graceful_shutdown_selector.push(trigger.boxed());
self
}

/// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
///
/// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
/// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
///
/// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
/// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
/// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
/// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
///
/// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
/// [`Controller::graceful_shutdown_on`].
///
/// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
/// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
/// unless you run [`std::process::exit`] afterwards.
pub fn shutdown_on_signal(mut self) -> Self {
async fn shutdown_signal() {
futures::future::select(
tokio::signal::ctrl_c().map(|_| ()).boxed(),
#[cfg(unix)]
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap()
.recv()
.map(|_| ())
.boxed(),
// Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
#[cfg(not(unix))]
futures::future::pending(),
)
.await;
}

let (graceful_tx, graceful_rx) = channel::oneshot::channel();
self.graceful_shutdown_selector
.push(graceful_rx.map(|_| ()).boxed());
self.forceful_shutdown_selector.push(
async {
tracing::info!("press ctrl+c to shut down gracefully");
shutdown_signal().await;
if let Ok(()) = graceful_tx.send(()) {
tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
} else {
tracing::info!(
"graceful shutdown already requested, press ctrl+c again to force shutdown"
);
}
shutdown_signal().await;
tracing::info!("forced shutdown requested");
}
.boxed(),
);
self
}

/// Consume all the parameters of the Controller and start the applier stream
///
/// This creates a stream from all builder calls and starts an applier with
Expand All @@ -568,8 +703,10 @@ where
error_policy,
context,
self.reader,
self.trigger_selector,
self.trigger_selector
.take_until(future::select_all(self.graceful_shutdown_selector)),
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
}

Expand Down
13 changes: 10 additions & 3 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ mod tests {
use crate::scheduler::{scheduler, ScheduleRequest};
use futures::{
channel::{mpsc, oneshot},
poll, SinkExt, TryStreamExt,
future, poll, SinkExt, TryStreamExt,
};
use std::{cell::RefCell, time::Duration};
use tokio::{
Expand Down Expand Up @@ -135,8 +135,15 @@ mod tests {
})
.await
.unwrap();
drop(sched_tx);
runner.await.unwrap();
let ((), run) = future::join(
async {
tokio::time::sleep(Duration::from_secs(5)).await;
drop(sched_tx);
},
runner,
)
.await;
run.unwrap();
// Validate that we actually ran both requests
assert_eq!(count, 2);
}
Expand Down
Loading

0 comments on commit c84110f

Please sign in to comment.