Skip to content

Commit

Permalink
Add Controller::reconcile_on (#1163)
Browse files Browse the repository at this point in the history
* Add Controller::reconcile_on

Signed-off-by: Corentin Regal <[email protected]>

* Comment Controller::reconcile_on and add doctest

Signed-off-by: Corentin Regal <[email protected]>

* Improve PR documentation

Signed-off-by: Corentin Regal <[email protected]>

* FIx PR documentation

Signed-off-by: Corentin Regal <[email protected]>

* Add feature flag

Signed-off-by: Corentin Regal <[email protected]>

---------

Signed-off-by: Corentin Regal <[email protected]>
  • Loading branch information
co42 authored Apr 29, 2023
1 parent 3931c24 commit 546e618
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
3 changes: 2 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ rust-version = "1.63.0"
edition = "2021"

[features]
unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-stream-control"]
unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-stream-control", "unstable-runtime-reconcile-on"]
unstable-runtime-subscribe = []
unstable-runtime-predicates = []
unstable-runtime-stream-control = []
unstable-runtime-reconcile-on = []

[package.metadata.docs.rs]
features = ["k8s-openapi/v1_26", "unstable-runtime"]
Expand Down
59 changes: 59 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,65 @@ where
self
}

/// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
///
/// For example, this can be used to watch resources once and use the stream to trigger reconciliation and also keep a cache of those objects.
/// That way it's possible to use this up to date cache instead of querying Kubernetes to access those resources
///
/// # Example:
///
/// ```no_run
/// # async {
/// # use futures::{StreamExt, TryStreamExt};
/// # use k8s_openapi::api::core::v1::{ConfigMap, Pod};
/// # use kube::api::Api;
/// # use kube::runtime::controller::Action;
/// # use kube::runtime::reflector::{ObjectRef, Store};
/// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
/// # use kube::runtime::watcher::Config;
/// # use kube::{Client, Error, ResourceExt};
/// # use std::future;
/// # use std::sync::Arc;
/// #
/// # let client: Client = todo!();
/// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Store<Pod>>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Store<Pod>>) -> Action { Action::await_change() }
/// #
/// // Store can be used in the reconciler instead of querying Kube
/// let (pod_store, writer) = reflector::store();
/// let pod_stream = reflector(
/// writer,
/// watcher(Api::<Pod>::all(client.clone()), Config::default()),
/// )
/// .applied_objects()
/// // Map to the relevant `ObjectRef<K>` to reconcile
/// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap()));
///
/// Controller::new(Api::<ConfigMap>::all(client), Config::default())
/// .reconcile_on(pod_stream)
/// // The store can be re-used between controllers and even inspected from the reconciler through [Context]
/// .run(reconcile, error_policy, Arc::new(pod_store))
/// .for_each(|_| future::ready(()))
/// .await;
/// # };
/// ```
#[cfg(feature = "unstable-runtime-reconcile-on")]
#[must_use]
pub fn reconcile_on(
mut self,
trigger: impl Stream<Item = Result<ObjectRef<K>, watcher::Error>> + Send + 'static,
) -> Self {
self.trigger_selector.push(
trigger
.map_ok(move |obj| ReconcileRequest {
obj_ref: obj,
reason: ReconcileReason::Unknown,
})
.boxed(),
);
self
}

/// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
///
/// - No new reconciliations are started from the scheduler
Expand Down

0 comments on commit 546e618

Please sign in to comment.