From c25bd48fd01874937aed4ffe25daadb80beb0119 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 8 Jun 2022 16:00:45 +0200 Subject: [PATCH] Run post-reconciliation requeue in the reconciler context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes #926, since we already run multiple reconcilers in parallel. Signed-off-by: Teo Klestrup Röijezon --- kube-derive/src/custom_resource.rs | 2 +- kube-runtime/src/controller/mod.rs | 171 ++++++++++++++++++++--------- kube-runtime/src/utils/mod.rs | 33 ++++++ 3 files changed, 154 insertions(+), 52 deletions(-) diff --git a/kube-derive/src/custom_resource.rs b/kube-derive/src/custom_resource.rs index 20adff5fe..c4622b1e9 100644 --- a/kube-derive/src/custom_resource.rs +++ b/kube-derive/src/custom_resource.rs @@ -599,6 +599,6 @@ mod tests { struct FooSpec { foo: String } }; let input = syn::parse2(input).unwrap(); - let kube_attrs = KubeAttrs::from_derive_input(&input).unwrap(); + let _kube_attrs = KubeAttrs::from_derive_input(&input).unwrap(); } } diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 517e2881a..fe31483af 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -16,14 +16,16 @@ use derivative::Derivative; use futures::{ channel, future::{self, BoxFuture}, - stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, + ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, }; use kube_client::api::{Api, DynamicObject, ListParams, Resource}; +use pin_project::pin_project; use serde::de::DeserializeOwned; use std::{ fmt::{Debug, Display}, hash::Hash, sync::Arc, + task::Poll, time::Duration, }; use stream::BoxStream; @@ -217,7 +219,7 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100; /// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose. pub fn applier( mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut, - mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc) -> Action, + error_policy: impl Fn(&ReconcilerFut::Error, Arc) -> Action, context: Arc, store: Store, queue: QueueStream, @@ -232,23 +234,25 @@ where QueueStream::Error: std::error::Error + 'static, { let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel(); - let err_context = context.clone(); let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::>>(APPLIER_REQUEUE_BUF_SIZE); + let error_policy = Arc::new(error_policy); // Create a stream of ObjectRefs that need to be reconciled trystream_try_via( // input: stream combining scheduled tasks and user specified inputs event Box::pin(stream::select( // 1. inputs from users queue stream - queue.map_err(Error::QueueError).map_ok(|request| ScheduleRequest { - message: request.into(), - run_at: Instant::now() + Duration::from_millis(1), - }) - .on_complete(async move { - // On error: scheduler has already been shut down and there is nothing for us to do - let _ = scheduler_shutdown_tx.send(()); - tracing::debug!("applier queue terminated, starting graceful shutdown") - }), + queue + .map_err(Error::QueueError) + .map_ok(|request| ScheduleRequest { + message: request.into(), + run_at: Instant::now() + Duration::from_millis(1), + }) + .on_complete(async move { + // On error: scheduler has already been shut down and there is nothing for us to do + let _ = scheduler_shutdown_tx.send(()); + tracing::debug!("applier queue terminated, starting graceful shutdown") + }), // 2. requests sent to scheduler_tx scheduler_rx .map(Ok) @@ -261,19 +265,33 @@ where let request = request.clone(); match store.get(&request.obj_ref) { Some(obj) => { - let reconciler_span = info_span!("reconciling object", "object.ref" = %request.obj_ref, object.reason = %request.reason); - reconciler_span.in_scope(|| reconciler(obj, context.clone())) - .into_future() - .instrument(reconciler_span.clone()) - // Reconciler errors are OK from the applier's PoV, we need to apply the error policy - // to them separately - .map(|res| Ok((request.obj_ref, res, reconciler_span))) - .left_future() - }, - None => future::err( - Error::ObjectNotFound(request.obj_ref.erase()) - ) - .right_future(), + let scheduler_tx = scheduler_tx.clone(); + let error_policy_ctx = context.clone(); + let error_policy = error_policy.clone(); + let reconciler_span = info_span!( + "reconciling object", + "object.ref" = %request.obj_ref, + object.reason = %request.reason + ); + reconciler_span + .in_scope(|| reconciler(obj, context.clone())) + .into_future() + .then(move |res| { + let error_policy = error_policy; + PostReconciler::new( + res, + |err| error_policy(err, error_policy_ctx), + request.obj_ref.clone(), + scheduler_tx, + ) + // Reconciler errors are OK from the applier's PoV, we need to apply the error policy + // to them separately + .map(|res| Ok((request.obj_ref, res))) + }) + .instrument(reconciler_span) + .left_future() + } + None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), } }) .on_complete(async { tracing::debug!("applier runner terminated") }) @@ -281,36 +299,87 @@ where ) .on_complete(async { tracing::debug!("applier runner-merge terminated") }) // finally, for each completed reconcile call: - .and_then(move |(obj_ref, reconciler_result, reconciler_span)| { - let (Action { requeue_after }, requeue_reason) = match &reconciler_result { - Ok(action) => - // do what user told us - (action.clone(), ReconcileReason::ReconcilerRequestedRetry), - Err(err) => - // reconciler fn call failed - (reconciler_span.in_scope(|| error_policy(err, err_context.clone())), ReconcileReason::ErrorPolicyRequestedRetry), - }; - let mut scheduler_tx = scheduler_tx.clone(); - async move { - // Transmit the requeue request to the scheduler (picked up again at top) - if let Some(delay) = requeue_after { - // Failure to schedule item = in graceful shutdown mode, ignore - let _ = scheduler_tx - .send(ScheduleRequest { - message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason}, - run_at: Instant::now() + delay, - }) - .await; - } - match reconciler_result { - Ok(action) => Ok((obj_ref, action)), - Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())) - } + .and_then(move |(obj_ref, reconciler_result)| async move { + match reconciler_result { + Ok(action) => Ok((obj_ref, action)), + Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())), } }) .on_complete(async { tracing::debug!("applier terminated") }) } +/// Internal helper that runs post-reconciliation (such as requesting rescheduling) tasks in the scheduled context of the reconciler +/// +/// This could be an `async fn`, but isn't because we want it to be [`Unpin`] +#[pin_project] +#[must_use] +struct PostReconciler { + reschedule_tx: channel::mpsc::Sender>>, + + reschedule_request: Option>>, + result: Option>, +} + +impl PostReconciler +where + K: Resource, +{ + fn new( + result: Result, + error_policy: impl FnOnce(&ReconcilerErr) -> Action, + obj_ref: ObjectRef, + reschedule_tx: channel::mpsc::Sender>>, + ) -> Self { + let reconciler_finished_at = Instant::now(); + + let (action, reschedule_reason) = result.as_ref().map_or_else( + |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry), + |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry), + ); + + Self { + reschedule_tx, + reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest { + message: ReconcileRequest { + obj_ref, + reason: reschedule_reason, + }, + run_at: reconciler_finished_at + requeue_after, + }), + result: Some(result), + } + } +} + +impl Future for PostReconciler +where + K: Resource, +{ + type Output = Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.get_mut(); + + if this.reschedule_request.is_some() { + let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx)); + let reschedule_request = this + .reschedule_request + .take() + .expect("PostReconciler::reschedule_request was taken during processing"); + // Failure to schedule item = in graceful shutdown mode, ignore + if let Ok(()) = rescheduler_ready { + let _ = this.reschedule_tx.start_send(reschedule_request); + } + } + + Poll::Ready( + this.result + .take() + .expect("PostReconciler::result was already taken"), + ) + } +} + /// Controller /// /// A controller is made up of: @@ -739,7 +808,7 @@ where pub fn run( self, mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut, - error_policy: impl FnMut(&ReconcilerFut::Error, Arc) -> Action, + error_policy: impl Fn(&ReconcilerFut::Error, Arc) -> Action, context: Arc, ) -> impl Stream, Action), Error>> where diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index ff0bf03a0..a1bbd3187 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -228,3 +228,36 @@ pub(crate) trait KubeRuntimeStreamExt: Stream + Sized { } impl KubeRuntimeStreamExt for S {} + +#[cfg(test)] +mod tests { + use std::convert::Infallible; + + use futures::stream::{self, StreamExt}; + + use super::trystream_try_via; + + // Type-level test does not need to be executed + #[allow(dead_code)] + fn trystream_try_via_should_be_able_to_borrow() { + struct WeirdComplexObject {} + impl Drop for WeirdComplexObject { + fn drop(&mut self) {} + } + + let mut x = WeirdComplexObject {}; + let y = WeirdComplexObject {}; + drop(trystream_try_via( + Box::pin(stream::once(async { + let _ = &mut x; + Result::<_, Infallible>::Ok(()) + })), + |s| { + s.map(|_| { + let _ = &y; + Ok(()) + }) + }, + )); + } +}