Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalizer helper #475

Merged
merged 11 commits into from
Jul 2, 2021
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,7 @@ required-features = ["native-tls", "rustls-tls"]
[[example]]
name = "custom_client_trace"
path = "custom_client_trace.rs"

[[example]]
name = "secret_syncer"
path = "secret_syncer.rs"
109 changes: 109 additions & 0 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Demonstrates a controller some outside resource that it needs to clean up when the owner is deleted

// NOTE: This is designed to demonstrate how to use finalizers, but is not in itself a good use case for them.
// If you actually want to clean up other Kubernetes objects then you should use `ownerReferences` instead and let
// k8s garbage collect the children.

use futures::StreamExt;
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
use kube::{
api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource},
error::ErrorResponse,
Api,
};
use kube_runtime::{
controller::{Context, ReconcilerAction},
finalizer::{finalizer, Event},
Controller,
};
use snafu::{OptionExt, ResultExt, Snafu};
use std::time::Duration;

#[derive(Debug, Snafu)]
enum Error {
NoName,
NoNamespace,
UpdateSecret { source: kube::Error },
DeleteSecret { source: kube::Error },
}
type Result<T, E = Error> = std::result::Result<T, E>;

fn secret_name_for_configmap(cm: &ConfigMap) -> Result<String> {
Ok(format!("cm---{}", cm.metadata.name.as_deref().context(NoName)?))
}

async fn apply(cm: ConfigMap, secrets: &kube::Api<Secret>) -> Result<ReconcilerAction> {
println!("Reconciling {:?}", cm);
let secret_name = secret_name_for_configmap(&cm)?;
secrets
.patch(
&secret_name,
&PatchParams::apply("configmap-secret-syncer.nullable.se"),
&Patch::Apply(Secret {
metadata: ObjectMeta {
name: Some(secret_name.clone()),
..ObjectMeta::default()
},
string_data: cm.data,
data: cm.binary_data,
..Secret::default()
}),
)
.await
.context(UpdateSecret)?;
Ok(ReconcilerAction { requeue_after: None })
}

async fn cleanup(cm: ConfigMap, secrets: &kube::Api<Secret>) -> Result<ReconcilerAction> {
println!("Cleaning up {:?}", cm);
secrets
.delete(&secret_name_for_configmap(&cm)?, &DeleteParams::default())
.await
.map(|_| ())
.or_else(|err| match err {
// Object is already deleted
kube::Error::Api(ErrorResponse { code: 404, .. }) => Ok(()),
err => Err(err),
})
.context(DeleteSecret)?;
Ok(ReconcilerAction { requeue_after: None })
}

#[tokio::main]
async fn main() -> color_eyre::Result<()> {
env_logger::init();
let kube = kube::Client::try_default().await?;
let all_cms = kube::Api::<ConfigMap>::all(kube.clone());
Controller::new(
all_cms,
ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"),
)
.run(
|cm, _| {
let ns = cm.meta().namespace.as_deref().context(NoNamespace).unwrap();
let cms: Api<ConfigMap> = Api::namespaced(kube.clone(), ns);
let secrets: Api<Secret> = Api::namespaced(kube.clone(), ns);
async move {
finalizer(
&cms,
"configmap-secret-syncer.nullable.se/cleanup",
cm,
|event| async {
match event {
Event::Apply(cm) => apply(cm, &secrets).await,
Event::Cleanup(cm) => cleanup(cm, &secrets).await,
}
},
)
.await
nightkr marked this conversation as resolved.
Show resolved Hide resolved
}
},
|_err, _| ReconcilerAction {
requeue_after: Some(Duration::from_secs(2)),
},
Context::new(()),
)
.for_each(|msg| async move { println!("Reconciled: {:?}", msg) })
.await;
Ok(())
}
9 changes: 9 additions & 0 deletions examples/secret_syncer_configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: secret-syncer-example
labels:
configmap-secret-syncer.nullable.se/sync: "true"
data:
foo: bar
spam: eggs
4 changes: 3 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ edition = "2018"

[dependencies]
futures = "0.3.8"
kube = { path = "../kube", version = "^0.57.0", default-features = false, features = ["client"] }
kube = { path = "../kube", version = "^0.57.0", default-features = false, features = ["jsonpatch", "client"] }
derivative = "2.1.1"
serde = "1.0.118"
smallvec = "1.6.0"
Expand All @@ -24,6 +24,8 @@ snafu = { version = "0.6.10", features = ["futures"] }
dashmap = "4.0.1"
tokio-util = { version = "0.6.0", features = ["time"] }
tracing = "0.1.26"
json-patch = "0.2.6"
serde_json = "1.0.64"

[dependencies.k8s-openapi]
version = "0.12.0"
Expand Down
210 changes: 210 additions & 0 deletions kube-runtime/src/finalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use crate::controller::ReconcilerAction;
use futures::{TryFuture, TryFutureExt};
use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation};
use kube::{
api::{Patch, PatchParams},
Api, Resource,
};
use serde::{de::DeserializeOwned, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{error::Error as StdError, fmt::Debug};

#[derive(Debug, Snafu)]
pub enum Error<ReconcileErr>
where
ReconcileErr: StdError + 'static,
{
#[snafu(display("failed to apply object: {}", source))]
ApplyFailed { source: ReconcileErr },
#[snafu(display("failed to clean up object: {}", source))]
CleanupFailed { source: ReconcileErr },
#[snafu(display("failed to add finalizer: {}", source))]
AddFinalizer { source: kube::Error },
#[snafu(display("failed to remove finalizer: {}", source))]
RemoveFinalizer { source: kube::Error },
#[snafu(display("object has no name"))]
UnnamedObject,
}

struct FinalizerState {
finalizer_index: Option<usize>,
is_deleting: bool,
}

impl FinalizerState {
fn for_object<K: Resource>(obj: &K, finalizer_name: &str) -> Self {
Self {
finalizer_index: obj
.meta()
.finalizers
.iter()
.enumerate()
.find(|(_, fin)| *fin == finalizer_name)
.map(|(i, _)| i),
is_deleting: obj.meta().deletion_timestamp.is_some(),
}
}
}

/// Reconcile an object in a way that requires cleanup before an object can be deleted. It does this by
/// managing a [`ObjectMeta::finalizers`] entry, which prevents the object from being deleted before the
/// cleanup is done.
///
/// In typical usage, if you use `finalizer` then it should be the only top-level "action"
/// in your [`applier`]/[`Controller`]'s `reconcile` function.
///
/// # Expected Flow
///
/// 1. User creates object
/// 2. Reconciler sees object
/// 3. `finalizer` adds `finalizer_name` to [`ObjectMeta::finalizers`]
/// 4. Reconciler sees updated object
/// 5. `finalizer` runs [`Event::Apply`]
/// 6. User updates object
/// 7. Reconciler sees updated object
/// 8. `finalizer` runs [`Event::Apply`]
/// 9. User deletes object
/// 10. Reconciler sees deleting object
/// 11. `finalizer` runs [`Event::Cleanup`]
/// 12. `finalizer` removes `finalizer_name` from [`ObjectMeta::finalizers`]
/// 13. Kubernetes sees that all [`ObjectMeta::finalizers`] are gone and finally deletes the object
///
/// # Guarantees
///
/// If [`Event::Apply`] is ever started then [`Event::Cleanup`] must succeed before the Kubernetes object deletion completes.
///
/// # Assumptions
///
/// `finalizer_name` must be unique among the controllers interacting with the object
///
/// [`Event::Apply`] and [`Event::Cleanup`] must both be idempotent, and tolerate being executed several times (even if previously cancelled).
///
/// [`Event::Cleanup`] must tolerate [`Event::Apply`] never having ran at all, or never having succeeded. Keep in mind that
/// even infallible `.await`s are cancellation points.
///
/// # Caveats
///
/// Object deletes will get stuck while the controller is not running, or if `cleanup` fails for some reason.
///
/// `reconcile` should take the object that the [`Event`] contains, rather than trying to reuse `obj`, since it may have been updated.
///
/// # Errors
///
/// [`Event::Apply`] and [`Event::Cleanup`] are both fallible, their errors are passed through as [`Error::ApplyFailed`]
/// and [`Error::CleanupFailed`], respectively.
///
/// In addition, adding and removing the finalizer itself may fail. In particular, this may be because of
/// network errors, lacking permissions, or because another `finalizer` was updated in the meantime on the same object.
pub async fn finalizer<K, ReconcileFut>(
api: &Api<K>,
finalizer_name: &str,
obj: K,
reconcile: impl FnOnce(Event<K>) -> ReconcileFut,
) -> Result<ReconcilerAction, Error<ReconcileFut::Error>>
where
K: Resource + Clone + DeserializeOwned + Serialize + Debug,
ReconcileFut: TryFuture<Ok = ReconcilerAction>,
ReconcileFut::Error: StdError + 'static,
{
match FinalizerState::for_object(&obj, finalizer_name) {
FinalizerState {
finalizer_index: Some(_),
is_deleting: false,
} => reconcile(Event::Apply(obj))
.into_future()
.await
.context(ApplyFailed),
FinalizerState {
finalizer_index: Some(finalizer_i),
is_deleting: true,
} => {
// Cleanup reconciliation must succeed before it's safe to remove the finalizer
let name = obj.meta().name.clone().context(UnnamedObject)?;
let action = reconcile(Event::Cleanup(obj))
.into_future()
.await
// Short-circuit, so that we keep the finalizer if cleanup fails
.context(CleanupFailed)?;
// Cleanup was successful, remove the finalizer so that deletion can continue
let finalizer_path = format!("/metadata/finalizers/{}", finalizer_i);
api.patch::<K>(
&name,
&PatchParams::default(),
&Patch::Json(json_patch::Patch(vec![
// All finalizers run concurrently and we use an integer index
// `Test` ensures that we fail instead of deleting someone else's finalizer
// (in which case a new `Cleanup` event will be sent)
PatchOperation::Test(TestOperation {
path: finalizer_path.clone(),
value: finalizer_name.into(),
}),
PatchOperation::Remove(RemoveOperation { path: finalizer_path }),
])),
)
.await
.context(RemoveFinalizer)?;
Ok(action)
}
FinalizerState {
finalizer_index: None,
is_deleting: false,
} => {
// Finalizer must be added before it's safe to run an `Apply` reconciliation
let patch = json_patch::Patch(if obj.meta().finalizers.is_empty() {
vec![
PatchOperation::Test(TestOperation {
path: "/metadata/finalizers".to_string(),
value: serde_json::Value::Null,
}),
PatchOperation::Add(AddOperation {
path: "/metadata/finalizers".to_string(),
value: vec![finalizer_name].into(),
}),
]
} else {
vec![PatchOperation::Add(AddOperation {
path: "/metadata/finalizers/-".to_string(),
value: finalizer_name.into(),
})]
});
api.patch::<K>(
obj.meta().name.as_deref().context(UnnamedObject)?,
&PatchParams::default(),
&Patch::Json(patch),
)
.await
.context(AddFinalizer)?;
// No point applying here, since the patch will cause a new reconciliation
Ok(ReconcilerAction { requeue_after: None })
}
FinalizerState {
finalizer_index: None,
is_deleting: true,
} => {
// Our work here is done
Ok(ReconcilerAction { requeue_after: None })
}
}
}

/// A representation of an action that should be taken by a reconciler.
pub enum Event<K> {
/// The reconciler should ensure that the actual state matches the state desired in the object.
///
/// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive):
///
/// - The controller is restarted
/// - The object is updated
/// - The reconciliation fails
/// - The grinch attacks
Apply(K),
/// The object is being deleted, and the reconciler should remove all resources that it owns.
///
/// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive):
///
/// - The controller is restarted while the deletion is in progress
/// - The reconciliation fails
/// - Another finalizer was removed in the meantime
/// - The grinch's heart grows a size or two
Cleanup(K),
}
2 changes: 2 additions & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#![allow(clippy::type_repetition_in_bounds)]

pub mod controller;
pub mod finalizer;
pub mod reflector;
pub mod scheduler;
pub mod utils;
pub mod watcher;

pub use controller::{applier, Controller};
pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
pub use watcher::watcher;