Skip to content

Commit

Permalink
Merge pull request #475 from Appva/feature/finalizer
Browse files Browse the repository at this point in the history
Finalizer helper
  • Loading branch information
nightkr authored Jul 2, 2021
2 parents c84110f + 14d7e2c commit 856d6fc
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 1 deletion.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,7 @@ required-features = ["native-tls", "rustls-tls"]
[[example]]
name = "custom_client_trace"
path = "custom_client_trace.rs"

[[example]]
name = "secret_syncer"
path = "secret_syncer.rs"
109 changes: 109 additions & 0 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Demonstrates a controller some outside resource that it needs to clean up when the owner is deleted

// NOTE: This is designed to demonstrate how to use finalizers, but is not in itself a good use case for them.
// If you actually want to clean up other Kubernetes objects then you should use `ownerReferences` instead and let
// k8s garbage collect the children.

use futures::StreamExt;
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
use kube::{
api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource},
error::ErrorResponse,
Api,
};
use kube_runtime::{
controller::{Context, ReconcilerAction},
finalizer::{finalizer, Event},
Controller,
};
use snafu::{OptionExt, ResultExt, Snafu};
use std::time::Duration;

#[derive(Debug, Snafu)]
enum Error {
NoName,
NoNamespace,
UpdateSecret { source: kube::Error },
DeleteSecret { source: kube::Error },
}
type Result<T, E = Error> = std::result::Result<T, E>;

fn secret_name_for_configmap(cm: &ConfigMap) -> Result<String> {
Ok(format!("cm---{}", cm.metadata.name.as_deref().context(NoName)?))
}

async fn apply(cm: ConfigMap, secrets: &kube::Api<Secret>) -> Result<ReconcilerAction> {
println!("Reconciling {:?}", cm);
let secret_name = secret_name_for_configmap(&cm)?;
secrets
.patch(
&secret_name,
&PatchParams::apply("configmap-secret-syncer.nullable.se"),
&Patch::Apply(Secret {
metadata: ObjectMeta {
name: Some(secret_name.clone()),
..ObjectMeta::default()
},
string_data: cm.data,
data: cm.binary_data,
..Secret::default()
}),
)
.await
.context(UpdateSecret)?;
Ok(ReconcilerAction { requeue_after: None })
}

async fn cleanup(cm: ConfigMap, secrets: &kube::Api<Secret>) -> Result<ReconcilerAction> {
println!("Cleaning up {:?}", cm);
secrets
.delete(&secret_name_for_configmap(&cm)?, &DeleteParams::default())
.await
.map(|_| ())
.or_else(|err| match err {
// Object is already deleted
kube::Error::Api(ErrorResponse { code: 404, .. }) => Ok(()),
err => Err(err),
})
.context(DeleteSecret)?;
Ok(ReconcilerAction { requeue_after: None })
}

#[tokio::main]
async fn main() -> color_eyre::Result<()> {
env_logger::init();
let kube = kube::Client::try_default().await?;
let all_cms = kube::Api::<ConfigMap>::all(kube.clone());
Controller::new(
all_cms,
ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"),
)
.run(
|cm, _| {
let ns = cm.meta().namespace.as_deref().context(NoNamespace).unwrap();
let cms: Api<ConfigMap> = Api::namespaced(kube.clone(), ns);
let secrets: Api<Secret> = Api::namespaced(kube.clone(), ns);
async move {
finalizer(
&cms,
"configmap-secret-syncer.nullable.se/cleanup",
cm,
|event| async {
match event {
Event::Apply(cm) => apply(cm, &secrets).await,
Event::Cleanup(cm) => cleanup(cm, &secrets).await,
}
},
)
.await
}
},
|_err, _| ReconcilerAction {
requeue_after: Some(Duration::from_secs(2)),
},
Context::new(()),
)
.for_each(|msg| async move { println!("Reconciled: {:?}", msg) })
.await;
Ok(())
}
9 changes: 9 additions & 0 deletions examples/secret_syncer_configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: secret-syncer-example
labels:
configmap-secret-syncer.nullable.se/sync: "true"
data:
foo: bar
spam: eggs
4 changes: 3 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ edition = "2018"

[dependencies]
futures = "0.3.8"
kube = { path = "../kube", version = "^0.57.0", default-features = false, features = ["client"] }
kube = { path = "../kube", version = "^0.57.0", default-features = false, features = ["jsonpatch", "client"] }
derivative = "2.1.1"
serde = "1.0.118"
smallvec = "1.6.0"
Expand All @@ -24,6 +24,8 @@ snafu = { version = "0.6.10", features = ["futures"] }
dashmap = "4.0.1"
tokio-util = { version = "0.6.0", features = ["time"] }
tracing = "0.1.26"
json-patch = "0.2.6"
serde_json = "1.0.64"

[dependencies.k8s-openapi]
version = "0.12.0"
Expand Down
210 changes: 210 additions & 0 deletions kube-runtime/src/finalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use crate::controller::ReconcilerAction;
use futures::{TryFuture, TryFutureExt};
use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation};
use kube::{
api::{Patch, PatchParams},
Api, Resource,
};
use serde::{de::DeserializeOwned, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{error::Error as StdError, fmt::Debug};

#[derive(Debug, Snafu)]
pub enum Error<ReconcileErr>
where
ReconcileErr: StdError + 'static,
{
#[snafu(display("failed to apply object: {}", source))]
ApplyFailed { source: ReconcileErr },
#[snafu(display("failed to clean up object: {}", source))]
CleanupFailed { source: ReconcileErr },
#[snafu(display("failed to add finalizer: {}", source))]
AddFinalizer { source: kube::Error },
#[snafu(display("failed to remove finalizer: {}", source))]
RemoveFinalizer { source: kube::Error },
#[snafu(display("object has no name"))]
UnnamedObject,
}

struct FinalizerState {
finalizer_index: Option<usize>,
is_deleting: bool,
}

impl FinalizerState {
fn for_object<K: Resource>(obj: &K, finalizer_name: &str) -> Self {
Self {
finalizer_index: obj
.meta()
.finalizers
.iter()
.enumerate()
.find(|(_, fin)| *fin == finalizer_name)
.map(|(i, _)| i),
is_deleting: obj.meta().deletion_timestamp.is_some(),
}
}
}

/// Reconcile an object in a way that requires cleanup before an object can be deleted. It does this by
/// managing a [`ObjectMeta::finalizers`] entry, which prevents the object from being deleted before the
/// cleanup is done.
///
/// In typical usage, if you use `finalizer` then it should be the only top-level "action"
/// in your [`applier`]/[`Controller`]'s `reconcile` function.
///
/// # Expected Flow
///
/// 1. User creates object
/// 2. Reconciler sees object
/// 3. `finalizer` adds `finalizer_name` to [`ObjectMeta::finalizers`]
/// 4. Reconciler sees updated object
/// 5. `finalizer` runs [`Event::Apply`]
/// 6. User updates object
/// 7. Reconciler sees updated object
/// 8. `finalizer` runs [`Event::Apply`]
/// 9. User deletes object
/// 10. Reconciler sees deleting object
/// 11. `finalizer` runs [`Event::Cleanup`]
/// 12. `finalizer` removes `finalizer_name` from [`ObjectMeta::finalizers`]
/// 13. Kubernetes sees that all [`ObjectMeta::finalizers`] are gone and finally deletes the object
///
/// # Guarantees
///
/// If [`Event::Apply`] is ever started then [`Event::Cleanup`] must succeed before the Kubernetes object deletion completes.
///
/// # Assumptions
///
/// `finalizer_name` must be unique among the controllers interacting with the object
///
/// [`Event::Apply`] and [`Event::Cleanup`] must both be idempotent, and tolerate being executed several times (even if previously cancelled).
///
/// [`Event::Cleanup`] must tolerate [`Event::Apply`] never having ran at all, or never having succeeded. Keep in mind that
/// even infallible `.await`s are cancellation points.
///
/// # Caveats
///
/// Object deletes will get stuck while the controller is not running, or if `cleanup` fails for some reason.
///
/// `reconcile` should take the object that the [`Event`] contains, rather than trying to reuse `obj`, since it may have been updated.
///
/// # Errors
///
/// [`Event::Apply`] and [`Event::Cleanup`] are both fallible, their errors are passed through as [`Error::ApplyFailed`]
/// and [`Error::CleanupFailed`], respectively.
///
/// In addition, adding and removing the finalizer itself may fail. In particular, this may be because of
/// network errors, lacking permissions, or because another `finalizer` was updated in the meantime on the same object.
pub async fn finalizer<K, ReconcileFut>(
api: &Api<K>,
finalizer_name: &str,
obj: K,
reconcile: impl FnOnce(Event<K>) -> ReconcileFut,
) -> Result<ReconcilerAction, Error<ReconcileFut::Error>>
where
K: Resource + Clone + DeserializeOwned + Serialize + Debug,
ReconcileFut: TryFuture<Ok = ReconcilerAction>,
ReconcileFut::Error: StdError + 'static,
{
match FinalizerState::for_object(&obj, finalizer_name) {
FinalizerState {
finalizer_index: Some(_),
is_deleting: false,
} => reconcile(Event::Apply(obj))
.into_future()
.await
.context(ApplyFailed),
FinalizerState {
finalizer_index: Some(finalizer_i),
is_deleting: true,
} => {
// Cleanup reconciliation must succeed before it's safe to remove the finalizer
let name = obj.meta().name.clone().context(UnnamedObject)?;
let action = reconcile(Event::Cleanup(obj))
.into_future()
.await
// Short-circuit, so that we keep the finalizer if cleanup fails
.context(CleanupFailed)?;
// Cleanup was successful, remove the finalizer so that deletion can continue
let finalizer_path = format!("/metadata/finalizers/{}", finalizer_i);
api.patch::<K>(
&name,
&PatchParams::default(),
&Patch::Json(json_patch::Patch(vec![
// All finalizers run concurrently and we use an integer index
// `Test` ensures that we fail instead of deleting someone else's finalizer
// (in which case a new `Cleanup` event will be sent)
PatchOperation::Test(TestOperation {
path: finalizer_path.clone(),
value: finalizer_name.into(),
}),
PatchOperation::Remove(RemoveOperation { path: finalizer_path }),
])),
)
.await
.context(RemoveFinalizer)?;
Ok(action)
}
FinalizerState {
finalizer_index: None,
is_deleting: false,
} => {
// Finalizer must be added before it's safe to run an `Apply` reconciliation
let patch = json_patch::Patch(if obj.meta().finalizers.is_empty() {
vec![
PatchOperation::Test(TestOperation {
path: "/metadata/finalizers".to_string(),
value: serde_json::Value::Null,
}),
PatchOperation::Add(AddOperation {
path: "/metadata/finalizers".to_string(),
value: vec![finalizer_name].into(),
}),
]
} else {
vec![PatchOperation::Add(AddOperation {
path: "/metadata/finalizers/-".to_string(),
value: finalizer_name.into(),
})]
});
api.patch::<K>(
obj.meta().name.as_deref().context(UnnamedObject)?,
&PatchParams::default(),
&Patch::Json(patch),
)
.await
.context(AddFinalizer)?;
// No point applying here, since the patch will cause a new reconciliation
Ok(ReconcilerAction { requeue_after: None })
}
FinalizerState {
finalizer_index: None,
is_deleting: true,
} => {
// Our work here is done
Ok(ReconcilerAction { requeue_after: None })
}
}
}

/// A representation of an action that should be taken by a reconciler.
pub enum Event<K> {
/// The reconciler should ensure that the actual state matches the state desired in the object.
///
/// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive):
///
/// - The controller is restarted
/// - The object is updated
/// - The reconciliation fails
/// - The grinch attacks
Apply(K),
/// The object is being deleted, and the reconciler should remove all resources that it owns.
///
/// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive):
///
/// - The controller is restarted while the deletion is in progress
/// - The reconciliation fails
/// - Another finalizer was removed in the meantime
/// - The grinch's heart grows a size or two
Cleanup(K),
}
2 changes: 2 additions & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#![allow(clippy::type_repetition_in_bounds)]

pub mod controller;
pub mod finalizer;
pub mod reflector;
pub mod scheduler;
pub mod utils;
pub mod watcher;

pub use controller::{applier, Controller};
pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
pub use watcher::watcher;

0 comments on commit 856d6fc

Please sign in to comment.