diff --git a/examples/Cargo.toml b/examples/Cargo.toml index f00405f30..03bb00784 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -181,3 +181,7 @@ required-features = ["native-tls", "rustls-tls"] [[example]] name = "custom_client_trace" path = "custom_client_trace.rs" + +[[example]] +name = "secret_syncer" +path = "secret_syncer.rs" diff --git a/examples/secret_syncer.rs b/examples/secret_syncer.rs new file mode 100644 index 000000000..885953b49 --- /dev/null +++ b/examples/secret_syncer.rs @@ -0,0 +1,109 @@ +// Demonstrates a controller some outside resource that it needs to clean up when the owner is deleted + +// NOTE: This is designed to demonstrate how to use finalizers, but is not in itself a good use case for them. +// If you actually want to clean up other Kubernetes objects then you should use `ownerReferences` instead and let +// k8s garbage collect the children. + +use futures::StreamExt; +use k8s_openapi::api::core::v1::{ConfigMap, Secret}; +use kube::{ + api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource}, + error::ErrorResponse, + Api, +}; +use kube_runtime::{ + controller::{Context, ReconcilerAction}, + finalizer::{finalizer, Event}, + Controller, +}; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::time::Duration; + +#[derive(Debug, Snafu)] +enum Error { + NoName, + NoNamespace, + UpdateSecret { source: kube::Error }, + DeleteSecret { source: kube::Error }, +} +type Result = std::result::Result; + +fn secret_name_for_configmap(cm: &ConfigMap) -> Result { + Ok(format!("cm---{}", cm.metadata.name.as_deref().context(NoName)?)) +} + +async fn apply(cm: ConfigMap, secrets: &kube::Api) -> Result { + println!("Reconciling {:?}", cm); + let secret_name = secret_name_for_configmap(&cm)?; + secrets + .patch( + &secret_name, + &PatchParams::apply("configmap-secret-syncer.nullable.se"), + &Patch::Apply(Secret { + metadata: ObjectMeta { + name: Some(secret_name.clone()), + ..ObjectMeta::default() + }, + string_data: cm.data, + data: cm.binary_data, + ..Secret::default() + }), + ) + .await + .context(UpdateSecret)?; + Ok(ReconcilerAction { requeue_after: None }) +} + +async fn cleanup(cm: ConfigMap, secrets: &kube::Api) -> Result { + println!("Cleaning up {:?}", cm); + secrets + .delete(&secret_name_for_configmap(&cm)?, &DeleteParams::default()) + .await + .map(|_| ()) + .or_else(|err| match err { + // Object is already deleted + kube::Error::Api(ErrorResponse { code: 404, .. }) => Ok(()), + err => Err(err), + }) + .context(DeleteSecret)?; + Ok(ReconcilerAction { requeue_after: None }) +} + +#[tokio::main] +async fn main() -> color_eyre::Result<()> { + env_logger::init(); + let kube = kube::Client::try_default().await?; + let all_cms = kube::Api::::all(kube.clone()); + Controller::new( + all_cms, + ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"), + ) + .run( + |cm, _| { + let ns = cm.meta().namespace.as_deref().context(NoNamespace).unwrap(); + let cms: Api = Api::namespaced(kube.clone(), ns); + let secrets: Api = Api::namespaced(kube.clone(), ns); + async move { + finalizer( + &cms, + "configmap-secret-syncer.nullable.se/cleanup", + cm, + |event| async { + match event { + Event::Apply(cm) => apply(cm, &secrets).await, + Event::Cleanup(cm) => cleanup(cm, &secrets).await, + } + }, + ) + .await + } + }, + |_err, _| ReconcilerAction { + requeue_after: Some(Duration::from_secs(2)), + }, + Context::new(()), + ) + .for_each(|msg| async move { println!("Reconciled: {:?}", msg) }) + .await; + Ok(()) +} diff --git a/examples/secret_syncer_configmap.yaml b/examples/secret_syncer_configmap.yaml new file mode 100644 index 000000000..a66c408d3 --- /dev/null +++ b/examples/secret_syncer_configmap.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: secret-syncer-example + labels: + configmap-secret-syncer.nullable.se/sync: "true" +data: + foo: bar + spam: eggs diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 6136fd97d..970e60df4 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -14,7 +14,7 @@ edition = "2018" [dependencies] futures = "0.3.8" -kube = { path = "../kube", version = "^0.57.0", default-features = false, features = ["client"] } +kube = { path = "../kube", version = "^0.57.0", default-features = false, features = ["jsonpatch", "client"] } derivative = "2.1.1" serde = "1.0.118" smallvec = "1.6.0" @@ -24,6 +24,8 @@ snafu = { version = "0.6.10", features = ["futures"] } dashmap = "4.0.1" tokio-util = { version = "0.6.0", features = ["time"] } tracing = "0.1.26" +json-patch = "0.2.6" +serde_json = "1.0.64" [dependencies.k8s-openapi] version = "0.12.0" diff --git a/kube-runtime/src/finalizer.rs b/kube-runtime/src/finalizer.rs new file mode 100644 index 000000000..2fbd7b928 --- /dev/null +++ b/kube-runtime/src/finalizer.rs @@ -0,0 +1,210 @@ +use crate::controller::ReconcilerAction; +use futures::{TryFuture, TryFutureExt}; +use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation}; +use kube::{ + api::{Patch, PatchParams}, + Api, Resource, +}; +use serde::{de::DeserializeOwned, Serialize}; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::{error::Error as StdError, fmt::Debug}; + +#[derive(Debug, Snafu)] +pub enum Error +where + ReconcileErr: StdError + 'static, +{ + #[snafu(display("failed to apply object: {}", source))] + ApplyFailed { source: ReconcileErr }, + #[snafu(display("failed to clean up object: {}", source))] + CleanupFailed { source: ReconcileErr }, + #[snafu(display("failed to add finalizer: {}", source))] + AddFinalizer { source: kube::Error }, + #[snafu(display("failed to remove finalizer: {}", source))] + RemoveFinalizer { source: kube::Error }, + #[snafu(display("object has no name"))] + UnnamedObject, +} + +struct FinalizerState { + finalizer_index: Option, + is_deleting: bool, +} + +impl FinalizerState { + fn for_object(obj: &K, finalizer_name: &str) -> Self { + Self { + finalizer_index: obj + .meta() + .finalizers + .iter() + .enumerate() + .find(|(_, fin)| *fin == finalizer_name) + .map(|(i, _)| i), + is_deleting: obj.meta().deletion_timestamp.is_some(), + } + } +} + +/// Reconcile an object in a way that requires cleanup before an object can be deleted. It does this by +/// managing a [`ObjectMeta::finalizers`] entry, which prevents the object from being deleted before the +/// cleanup is done. +/// +/// In typical usage, if you use `finalizer` then it should be the only top-level "action" +/// in your [`applier`]/[`Controller`]'s `reconcile` function. +/// +/// # Expected Flow +/// +/// 1. User creates object +/// 2. Reconciler sees object +/// 3. `finalizer` adds `finalizer_name` to [`ObjectMeta::finalizers`] +/// 4. Reconciler sees updated object +/// 5. `finalizer` runs [`Event::Apply`] +/// 6. User updates object +/// 7. Reconciler sees updated object +/// 8. `finalizer` runs [`Event::Apply`] +/// 9. User deletes object +/// 10. Reconciler sees deleting object +/// 11. `finalizer` runs [`Event::Cleanup`] +/// 12. `finalizer` removes `finalizer_name` from [`ObjectMeta::finalizers`] +/// 13. Kubernetes sees that all [`ObjectMeta::finalizers`] are gone and finally deletes the object +/// +/// # Guarantees +/// +/// If [`Event::Apply`] is ever started then [`Event::Cleanup`] must succeed before the Kubernetes object deletion completes. +/// +/// # Assumptions +/// +/// `finalizer_name` must be unique among the controllers interacting with the object +/// +/// [`Event::Apply`] and [`Event::Cleanup`] must both be idempotent, and tolerate being executed several times (even if previously cancelled). +/// +/// [`Event::Cleanup`] must tolerate [`Event::Apply`] never having ran at all, or never having succeeded. Keep in mind that +/// even infallible `.await`s are cancellation points. +/// +/// # Caveats +/// +/// Object deletes will get stuck while the controller is not running, or if `cleanup` fails for some reason. +/// +/// `reconcile` should take the object that the [`Event`] contains, rather than trying to reuse `obj`, since it may have been updated. +/// +/// # Errors +/// +/// [`Event::Apply`] and [`Event::Cleanup`] are both fallible, their errors are passed through as [`Error::ApplyFailed`] +/// and [`Error::CleanupFailed`], respectively. +/// +/// In addition, adding and removing the finalizer itself may fail. In particular, this may be because of +/// network errors, lacking permissions, or because another `finalizer` was updated in the meantime on the same object. +pub async fn finalizer( + api: &Api, + finalizer_name: &str, + obj: K, + reconcile: impl FnOnce(Event) -> ReconcileFut, +) -> Result> +where + K: Resource + Clone + DeserializeOwned + Serialize + Debug, + ReconcileFut: TryFuture, + ReconcileFut::Error: StdError + 'static, +{ + match FinalizerState::for_object(&obj, finalizer_name) { + FinalizerState { + finalizer_index: Some(_), + is_deleting: false, + } => reconcile(Event::Apply(obj)) + .into_future() + .await + .context(ApplyFailed), + FinalizerState { + finalizer_index: Some(finalizer_i), + is_deleting: true, + } => { + // Cleanup reconciliation must succeed before it's safe to remove the finalizer + let name = obj.meta().name.clone().context(UnnamedObject)?; + let action = reconcile(Event::Cleanup(obj)) + .into_future() + .await + // Short-circuit, so that we keep the finalizer if cleanup fails + .context(CleanupFailed)?; + // Cleanup was successful, remove the finalizer so that deletion can continue + let finalizer_path = format!("/metadata/finalizers/{}", finalizer_i); + api.patch::( + &name, + &PatchParams::default(), + &Patch::Json(json_patch::Patch(vec![ + // All finalizers run concurrently and we use an integer index + // `Test` ensures that we fail instead of deleting someone else's finalizer + // (in which case a new `Cleanup` event will be sent) + PatchOperation::Test(TestOperation { + path: finalizer_path.clone(), + value: finalizer_name.into(), + }), + PatchOperation::Remove(RemoveOperation { path: finalizer_path }), + ])), + ) + .await + .context(RemoveFinalizer)?; + Ok(action) + } + FinalizerState { + finalizer_index: None, + is_deleting: false, + } => { + // Finalizer must be added before it's safe to run an `Apply` reconciliation + let patch = json_patch::Patch(if obj.meta().finalizers.is_empty() { + vec![ + PatchOperation::Test(TestOperation { + path: "/metadata/finalizers".to_string(), + value: serde_json::Value::Null, + }), + PatchOperation::Add(AddOperation { + path: "/metadata/finalizers".to_string(), + value: vec![finalizer_name].into(), + }), + ] + } else { + vec![PatchOperation::Add(AddOperation { + path: "/metadata/finalizers/-".to_string(), + value: finalizer_name.into(), + })] + }); + api.patch::( + obj.meta().name.as_deref().context(UnnamedObject)?, + &PatchParams::default(), + &Patch::Json(patch), + ) + .await + .context(AddFinalizer)?; + // No point applying here, since the patch will cause a new reconciliation + Ok(ReconcilerAction { requeue_after: None }) + } + FinalizerState { + finalizer_index: None, + is_deleting: true, + } => { + // Our work here is done + Ok(ReconcilerAction { requeue_after: None }) + } + } +} + +/// A representation of an action that should be taken by a reconciler. +pub enum Event { + /// The reconciler should ensure that the actual state matches the state desired in the object. + /// + /// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive): + /// + /// - The controller is restarted + /// - The object is updated + /// - The reconciliation fails + /// - The grinch attacks + Apply(K), + /// The object is being deleted, and the reconciler should remove all resources that it owns. + /// + /// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive): + /// + /// - The controller is restarted while the deletion is in progress + /// - The reconciliation fails + /// - Another finalizer was removed in the meantime + /// - The grinch's heart grows a size or two + Cleanup(K), +} diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index c640b4d6c..87a858355 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -18,12 +18,14 @@ #![allow(clippy::type_repetition_in_bounds)] pub mod controller; +pub mod finalizer; pub mod reflector; pub mod scheduler; pub mod utils; pub mod watcher; pub use controller::{applier, Controller}; +pub use finalizer::finalizer; pub use reflector::reflector; pub use scheduler::scheduler; pub use watcher::watcher;