-
-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Applier: Improve reconciler reschedule context to avoid deadlocking on full channel #932
Changes from 4 commits
d0adfa0
da80f15
c25bd48
89d19ff
8c9f591
ebf1607
457bdb1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,16 @@ use derivative::Derivative; | |
use futures::{ | ||
channel, | ||
future::{self, BoxFuture}, | ||
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, | ||
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, | ||
}; | ||
use kube_client::api::{Api, DynamicObject, ListParams, Resource}; | ||
use pin_project::pin_project; | ||
use serde::de::DeserializeOwned; | ||
use std::{ | ||
fmt::{Debug, Display}, | ||
hash::Hash, | ||
sync::Arc, | ||
task::Poll, | ||
time::Duration, | ||
}; | ||
use stream::BoxStream; | ||
|
@@ -202,6 +204,8 @@ impl Display for ReconcileReason { | |
} | ||
} | ||
|
||
const APPLIER_REQUEUE_BUF_SIZE: usize = 100; | ||
|
||
/// Apply a reconciler to an input stream, with a given retry policy | ||
/// | ||
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`]. | ||
|
@@ -215,7 +219,7 @@ impl Display for ReconcileReason { | |
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose. | ||
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>( | ||
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut, | ||
mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action, | ||
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action, | ||
context: Arc<Ctx>, | ||
store: Store<K>, | ||
queue: QueueStream, | ||
|
@@ -230,22 +234,25 @@ where | |
QueueStream::Error: std::error::Error + 'static, | ||
{ | ||
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel(); | ||
let err_context = context.clone(); | ||
let (scheduler_tx, scheduler_rx) = channel::mpsc::unbounded::<ScheduleRequest<ReconcileRequest<K>>>(); | ||
let (scheduler_tx, scheduler_rx) = | ||
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some points that i wanted to lift regarding the unbounded channels here because i'm not sure about the full reasoning here. it feels partly defensible to have unbounded queues:
but on the other hand. if i understand this queue correctly, it also serves as a limiter of the amount of parallelism in a controller? in that case, limiting it actually makes a lot of sense, because 10k objects being reconciled at once might DoS a third party service. is that a correct understanding of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not quite, there are actually four different "queues" going on in
This PR is only concerned with queue 4, where we have no practical way to implement deduping. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if I understand it right:
and the executor is going to work at its regular pace. ..so this means it is actually possible to reconcile 1000s of reconciles at the same time on re-lists currently? Is that something that is viable to bound at some point, somehow? In the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes.
Yes.
Well, we could add additional constraints for when we actually start running a pending reconciliation. That's not implemented at the moment, on the assumption that you could "just" use a semaphore in your reconciler function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doing the semaphoring in the executor (which has other benefits, like not having to allocate the task before it actually has a semaphore permit) shouldn't be too difficult either, the main problem there would be that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it makes sense to refactor a lot of appliers configuration into some kind of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably, yeah.. |
||
let error_policy = Arc::new(error_policy); | ||
clux marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Create a stream of ObjectRefs that need to be reconciled | ||
trystream_try_via( | ||
// input: stream combining scheduled tasks and user specified inputs event | ||
Box::pin(stream::select( | ||
// 1. inputs from users queue stream | ||
queue.map_err(Error::QueueError).map_ok(|request| ScheduleRequest { | ||
message: request.into(), | ||
run_at: Instant::now() + Duration::from_millis(1), | ||
}) | ||
.on_complete(async move { | ||
// On error: scheduler has already been shut down and there is nothing for us to do | ||
let _ = scheduler_shutdown_tx.send(()); | ||
tracing::debug!("applier queue terminated, starting graceful shutdown") | ||
}), | ||
queue | ||
.map_err(Error::QueueError) | ||
.map_ok(|request| ScheduleRequest { | ||
message: request.into(), | ||
run_at: Instant::now() + Duration::from_millis(1), | ||
}) | ||
.on_complete(async move { | ||
// On error: scheduler has already been shut down and there is nothing for us to do | ||
let _ = scheduler_shutdown_tx.send(()); | ||
tracing::debug!("applier queue terminated, starting graceful shutdown") | ||
}), | ||
// 2. requests sent to scheduler_tx | ||
scheduler_rx | ||
.map(Ok) | ||
|
@@ -258,56 +265,121 @@ where | |
let request = request.clone(); | ||
match store.get(&request.obj_ref) { | ||
Some(obj) => { | ||
let reconciler_span = info_span!("reconciling object", "object.ref" = %request.obj_ref, object.reason = %request.reason); | ||
reconciler_span.in_scope(|| reconciler(obj, context.clone())) | ||
.into_future() | ||
.instrument(reconciler_span.clone()) | ||
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy | ||
// to them separately | ||
.map(|res| Ok((request.obj_ref, res, reconciler_span))) | ||
.left_future() | ||
}, | ||
None => future::err( | ||
Error::ObjectNotFound(request.obj_ref.erase()) | ||
) | ||
.right_future(), | ||
let scheduler_tx = scheduler_tx.clone(); | ||
let error_policy_ctx = context.clone(); | ||
let error_policy = error_policy.clone(); | ||
let reconciler_span = info_span!( | ||
"reconciling object", | ||
"object.ref" = %request.obj_ref, | ||
object.reason = %request.reason | ||
); | ||
reconciler_span | ||
.in_scope(|| reconciler(obj, context.clone())) | ||
.into_future() | ||
.then(move |res| { | ||
let error_policy = error_policy; | ||
PostReconciler::new( | ||
res, | ||
|err| error_policy(err, error_policy_ctx), | ||
request.obj_ref.clone(), | ||
scheduler_tx, | ||
) | ||
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy | ||
// to them separately | ||
.map(|res| Ok((request.obj_ref, res))) | ||
}) | ||
.instrument(reconciler_span) | ||
.left_future() | ||
} | ||
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), | ||
} | ||
}) | ||
.on_complete(async { tracing::debug!("applier runner terminated") }) | ||
}, | ||
) | ||
.on_complete(async { tracing::debug!("applier runner-merge terminated") }) | ||
// finally, for each completed reconcile call: | ||
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| { | ||
let (Action { requeue_after }, requeue_reason) = match &reconciler_result { | ||
Ok(action) => | ||
// do what user told us | ||
(action.clone(), ReconcileReason::ReconcilerRequestedRetry), | ||
Err(err) => | ||
// reconciler fn call failed | ||
(reconciler_span.in_scope(|| error_policy(err, err_context.clone())), ReconcileReason::ErrorPolicyRequestedRetry), | ||
}; | ||
let mut scheduler_tx = scheduler_tx.clone(); | ||
async move { | ||
// Transmit the requeue request to the scheduler (picked up again at top) | ||
if let Some(delay) = requeue_after { | ||
// Failure to schedule item = in graceful shutdown mode, ignore | ||
let _ = scheduler_tx | ||
.send(ScheduleRequest { | ||
message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason}, | ||
run_at: Instant::now() + delay, | ||
}) | ||
.await; | ||
} | ||
match reconciler_result { | ||
Ok(action) => Ok((obj_ref, action)), | ||
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())) | ||
} | ||
.and_then(move |(obj_ref, reconciler_result)| async move { | ||
match reconciler_result { | ||
Ok(action) => Ok((obj_ref, action)), | ||
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())), | ||
} | ||
}) | ||
.on_complete(async { tracing::debug!("applier terminated") }) | ||
} | ||
|
||
/// Internal helper that runs post-reconciliation (such as requesting rescheduling) tasks in the scheduled context of the reconciler | ||
/// | ||
/// This could be an `async fn`, but isn't because we want it to be [`Unpin`] | ||
#[pin_project] | ||
#[must_use] | ||
struct PostReconciler<K: Resource, ReconcilerErr> { | ||
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>, | ||
clux marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>, | ||
result: Option<Result<Action, ReconcilerErr>>, | ||
} | ||
|
||
impl<K, ReconcilerErr> PostReconciler<K, ReconcilerErr> | ||
where | ||
K: Resource, | ||
{ | ||
fn new( | ||
result: Result<Action, ReconcilerErr>, | ||
error_policy: impl FnOnce(&ReconcilerErr) -> Action, | ||
obj_ref: ObjectRef<K>, | ||
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>, | ||
) -> Self { | ||
let reconciler_finished_at = Instant::now(); | ||
|
||
let (action, reschedule_reason) = result.as_ref().map_or_else( | ||
|err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry), | ||
|action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry), | ||
); | ||
|
||
Self { | ||
reschedule_tx, | ||
reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest { | ||
message: ReconcileRequest { | ||
obj_ref, | ||
reason: reschedule_reason, | ||
}, | ||
run_at: reconciler_finished_at + requeue_after, | ||
}), | ||
result: Some(result), | ||
} | ||
} | ||
} | ||
|
||
impl<K, ReconcilerErr> Future for PostReconciler<K, ReconcilerErr> | ||
where | ||
K: Resource, | ||
{ | ||
type Output = Result<Action, ReconcilerErr>; | ||
|
||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
|
||
if this.reschedule_request.is_some() { | ||
let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx)); | ||
let reschedule_request = this | ||
.reschedule_request | ||
.take() | ||
.expect("PostReconciler::reschedule_request was taken during processing"); | ||
// Failure to schedule item = in graceful shutdown mode, ignore | ||
if let Ok(()) = rescheduler_ready { | ||
let _ = this.reschedule_tx.start_send(reschedule_request); | ||
} | ||
} | ||
|
||
Poll::Ready( | ||
this.result | ||
.take() | ||
.expect("PostReconciler::result was already taken"), | ||
) | ||
} | ||
} | ||
|
||
/// Controller | ||
/// | ||
/// A controller is made up of: | ||
|
@@ -736,7 +808,7 @@ where | |
pub fn run<ReconcilerFut, Ctx>( | ||
self, | ||
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut, | ||
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action, | ||
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action, | ||
context: Arc<Ctx>, | ||
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>> | ||
where | ||
|
@@ -763,12 +835,18 @@ where | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::sync::Arc; | ||
|
||
use super::Action; | ||
use crate::Controller; | ||
use std::{convert::Infallible, sync::Arc, time::Duration}; | ||
|
||
use super::{Action, APPLIER_REQUEUE_BUF_SIZE}; | ||
use crate::{ | ||
applier, | ||
reflector::{self, ObjectRef}, | ||
watcher, Controller, | ||
}; | ||
use futures::{StreamExt, TryStreamExt}; | ||
use k8s_openapi::api::core::v1::ConfigMap; | ||
use kube_client::Api; | ||
use kube_client::{core::ObjectMeta, Api}; | ||
use tokio::time::timeout; | ||
|
||
fn assert_send<T: Send>(x: T) -> T { | ||
x | ||
|
@@ -791,4 +869,54 @@ mod tests { | |
), | ||
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() { | ||
// This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles | ||
// This is intended to avoid regressing on https://github.com/kube-rs/kube-rs/issues/926 | ||
|
||
// Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation" | ||
// On my (@teozkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives | ||
let items = APPLIER_REQUEUE_BUF_SIZE * 50; | ||
// Assume that everything's OK if we can reconcile every object 3 times on average | ||
let reconciles = items * 3; | ||
|
||
let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>(); | ||
let (store_rx, mut store_tx) = reflector::store(); | ||
let applier = tokio::spawn( | ||
applier( | ||
|obj, _| { | ||
Box::pin(async move { | ||
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately | ||
println!("reconciling {:?}", obj.metadata.name); | ||
Ok(Action::requeue(Duration::ZERO)) | ||
}) | ||
}, | ||
|_: &Infallible, _| todo!(), | ||
Arc::new(()), | ||
store_rx, | ||
queue_rx.map(Result::<_, Infallible>::Ok), | ||
) | ||
.take(reconciles) | ||
.try_for_each(|_| async { Ok(()) }), | ||
); | ||
for i in 0..items { | ||
let obj = ConfigMap { | ||
metadata: ObjectMeta { | ||
name: Some(format!("cm-{i}")), | ||
namespace: Some("default".to_string()), | ||
..Default::default() | ||
}, | ||
..Default::default() | ||
}; | ||
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone())); | ||
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap(); | ||
} | ||
// Keep the submission queue open to avoid going into graceful shutdown mode | ||
timeout(Duration::from_secs(10), applier) | ||
.await | ||
.expect("test timeout expired, applier likely deadlocked") | ||
.unwrap() | ||
.unwrap(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is fixed in master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like it, the only PR that wasn't included in this branch was #931, which didn't touch this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, my bad. turns out i had a different fix for it (by testing more) in https://github.com/kube-rs/kube-rs/pull/924/files but it hasn't been reviewed and thus not made it into master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh