From 546e6186c528518129305513407024e851251e7a Mon Sep 17 00:00:00 2001 From: Corentin REGAL Date: Sat, 29 Apr 2023 16:47:28 +0200 Subject: [PATCH] Add Controller::reconcile_on (#1163) * Add Controller::reconcile_on Signed-off-by: Corentin Regal * Comment Controller::reconcile_on and add doctest Signed-off-by: Corentin Regal * Improve PR documentation Signed-off-by: Corentin Regal * FIx PR documentation Signed-off-by: Corentin Regal * Add feature flag Signed-off-by: Corentin Regal --------- Signed-off-by: Corentin Regal --- kube-runtime/Cargo.toml | 3 +- kube-runtime/src/controller/mod.rs | 59 ++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index e1f15742a..0c8beecfd 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -15,10 +15,11 @@ rust-version = "1.63.0" edition = "2021" [features] -unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-stream-control"] +unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-stream-control", "unstable-runtime-reconcile-on"] unstable-runtime-subscribe = [] unstable-runtime-predicates = [] unstable-runtime-stream-control = [] +unstable-runtime-reconcile-on = [] [package.metadata.docs.rs] features = ["k8s-openapi/v1_26", "unstable-runtime"] diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 0c5d80a70..526aacf95 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1011,6 +1011,65 @@ where self } + /// Trigger the reconciliation process for a managed object `ObjectRef` whenever `trigger` emits a value + /// + /// For example, this can be used to watch resources once and use the stream to trigger reconciliation and also keep a cache of those objects. + /// That way it's possible to use this up to date cache instead of querying Kubernetes to access those resources + /// + /// # Example: + /// + /// ```no_run + /// # async { + /// # use futures::{StreamExt, TryStreamExt}; + /// # use k8s_openapi::api::core::v1::{ConfigMap, Pod}; + /// # use kube::api::Api; + /// # use kube::runtime::controller::Action; + /// # use kube::runtime::reflector::{ObjectRef, Store}; + /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt}; + /// # use kube::runtime::watcher::Config; + /// # use kube::{Client, Error, ResourceExt}; + /// # use std::future; + /// # use std::sync::Arc; + /// # + /// # let client: Client = todo!(); + /// # async fn reconcile(_: Arc, _: Arc>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc>) -> Action { Action::await_change() } + /// # + /// // Store can be used in the reconciler instead of querying Kube + /// let (pod_store, writer) = reflector::store(); + /// let pod_stream = reflector( + /// writer, + /// watcher(Api::::all(client.clone()), Config::default()), + /// ) + /// .applied_objects() + /// // Map to the relevant `ObjectRef` to reconcile + /// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap())); + /// + /// Controller::new(Api::::all(client), Config::default()) + /// .reconcile_on(pod_stream) + /// // The store can be re-used between controllers and even inspected from the reconciler through [Context] + /// .run(reconcile, error_policy, Arc::new(pod_store)) + /// .for_each(|_| future::ready(())) + /// .await; + /// # }; + /// ``` + #[cfg(feature = "unstable-runtime-reconcile-on")] + #[must_use] + pub fn reconcile_on( + mut self, + trigger: impl Stream, watcher::Error>> + Send + 'static, + ) -> Self { + self.trigger_selector.push( + trigger + .map_ok(move |obj| ReconcileRequest { + obj_ref: obj, + reason: ReconcileReason::Unknown, + }) + .boxed(), + ); + self + } + /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated: /// /// - No new reconciliations are started from the scheduler