Skip to content

Commit

Permalink
Merge pull request #932 from teozkr/bugfix/issue-926
Browse files Browse the repository at this point in the history
Applier: Improve reconciler reschedule context to avoid deadlocking on full channel
  • Loading branch information
nightkr authored Jun 9, 2022
2 parents d4c9f5d + 457bdb1 commit 12218ed
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 58 deletions.
2 changes: 1 addition & 1 deletion kube-derive/src/custom_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,6 @@ mod tests {
struct FooSpec { foo: String }
};
let input = syn::parse2(input).unwrap();
let kube_attrs = KubeAttrs::from_derive_input(&input).unwrap();
let _kube_attrs = KubeAttrs::from_derive_input(&input).unwrap();
}
}
254 changes: 197 additions & 57 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ use derivative::Derivative;
use futures::{
channel,
future::{self, BoxFuture},
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube_client::api::{Api, DynamicObject, ListParams, Resource};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
task::Poll,
time::Duration,
};
use stream::BoxStream;
Expand Down Expand Up @@ -202,6 +204,8 @@ impl Display for ReconcileReason {
}
}

const APPLIER_REQUEUE_BUF_SIZE: usize = 100;

/// Apply a reconciler to an input stream, with a given retry policy
///
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`].
Expand All @@ -215,7 +219,7 @@ impl Display for ReconcileReason {
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
Expand All @@ -230,22 +234,25 @@ where
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::unbounded::<ScheduleRequest<ReconcileRequest<K>>>();
let (scheduler_tx, scheduler_rx) =
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Box::pin(stream::select(
// 1. inputs from users queue stream
queue.map_err(Error::QueueError).map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
queue
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
// 2. requests sent to scheduler_tx
scheduler_rx
.map(Ok)
Expand All @@ -258,56 +265,121 @@ where
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let reconciler_span = info_span!("reconciling object", "object.ref" = %request.obj_ref, object.reason = %request.reason);
reconciler_span.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.instrument(reconciler_span.clone())
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res, reconciler_span)))
.left_future()
},
None => future::err(
Error::ObjectNotFound(request.obj_ref.erase())
)
.right_future(),
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| {
let (Action { requeue_after }, requeue_reason) = match &reconciler_result {
Ok(action) =>
// do what user told us
(action.clone(), ReconcileReason::ReconcilerRequestedRetry),
Err(err) =>
// reconciler fn call failed
(reconciler_span.in_scope(|| error_policy(err, err_context.clone())), ReconcileReason::ErrorPolicyRequestedRetry),
};
let mut scheduler_tx = scheduler_tx.clone();
async move {
// Transmit the requeue request to the scheduler (picked up again at top)
if let Some(delay) = requeue_after {
// Failure to schedule item = in graceful shutdown mode, ignore
let _ = scheduler_tx
.send(ScheduleRequest {
message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason},
run_at: Instant::now() + delay,
})
.await;
}
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase()))
}
.and_then(move |(obj_ref, reconciler_result)| async move {
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}

/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
///
/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
#[pin_project]
#[must_use]
struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,

reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
result: Option<Result<Action, ReconcilerErr>>,
}

impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
fn new(
result: Result<Action, ReconcilerErr>,
error_policy: impl FnOnce(&ReconcilerErr) -> Action,
obj_ref: ObjectRef<K>,
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
) -> Self {
let reconciler_finished_at = Instant::now();

let (action, reschedule_reason) = result.as_ref().map_or_else(
|err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
|action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
);

Self {
reschedule_tx,
reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
message: ReconcileRequest {
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at + requeue_after,
}),
result: Some(result),
}
}
}

impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
where
K: Resource,
{
type Output = Result<Action, ReconcilerErr>;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

if this.reschedule_request.is_some() {
let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
let reschedule_request = this
.reschedule_request
.take()
.expect("PostReconciler::reschedule_request was taken during processing");
// Failure to schedule item = in graceful shutdown mode, ignore
if let Ok(()) = rescheduler_ready {
let _ = this.reschedule_tx.start_send(reschedule_request);
}
}

Poll::Ready(
this.result
.take()
.expect("PostReconciler::result was already taken"),
)
}
}

/// Controller
///
/// A controller is made up of:
Expand Down Expand Up @@ -736,7 +808,7 @@ where
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
Expand All @@ -763,12 +835,18 @@ where

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::Action;
use crate::Controller;
use std::{convert::Infallible, sync::Arc, time::Duration};

use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
use crate::{
applier,
reflector::{self, ObjectRef},
watcher, Controller,
};
use futures::{pin_mut, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
use kube_client::{core::ObjectMeta, Api};
use tokio::time::timeout;

fn assert_send<T: Send>(x: T) -> T {
x
Expand All @@ -791,4 +869,66 @@ mod tests {
),
);
}

#[tokio::test]
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
// This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
// This is intended to avoid regressing on https://github.com/kube-rs/kube-rs/issues/926

// Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
// On my (@teozkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
let items = APPLIER_REQUEUE_BUF_SIZE * 50;
// Assume that everything's OK if we can reconcile every object 3 times on average
let reconciles = items * 3;

let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let applier = applier(
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
})
},
|_: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
);
pin_mut!(applier);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}

timeout(
Duration::from_secs(10),
applier
.as_mut()
.take(reconciles)
.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("test timeout expired, applier likely deadlocked")
.unwrap();

// Do an orderly shutdown to ensure that no individual reconcilers are stuck
drop(queue_tx);
timeout(
Duration::from_secs(10),
applier.try_for_each(|_| async { Ok(()) }),
)
.await
.expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
.unwrap();
}
}
33 changes: 33 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,36 @@ pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
}

impl<S: Stream> KubeRuntimeStreamExt for S {}

#[cfg(test)]
mod tests {
use std::convert::Infallible;

use futures::stream::{self, StreamExt};

use super::trystream_try_via;

// Type-level test does not need to be executed
#[allow(dead_code)]
fn trystream_try_via_should_be_able_to_borrow() {
struct WeirdComplexObject {}
impl Drop for WeirdComplexObject {
fn drop(&mut self) {}
}

let mut x = WeirdComplexObject {};
let y = WeirdComplexObject {};
drop(trystream_try_via(
Box::pin(stream::once(async {
let _ = &mut x;
Result::<_, Infallible>::Ok(())
})),
|s| {
s.map(|_| {
let _ = &y;
Ok(())
})
},
));
}
}

0 comments on commit 12218ed

Please sign in to comment.