Skip to content

Commit

Permalink
Run post-reconciliation requeue in the reconciler context
Browse files Browse the repository at this point in the history
This fixes kube-rs#926, since we already run multiple reconcilers in parallel.

Signed-off-by: Teo Klestrup Röijezon <[email protected]>
  • Loading branch information
nightkr committed Jun 8, 2022
1 parent da80f15 commit c25bd48
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 52 deletions.
2 changes: 1 addition & 1 deletion kube-derive/src/custom_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,6 @@ mod tests {
struct FooSpec { foo: String }
};
let input = syn::parse2(input).unwrap();
let kube_attrs = KubeAttrs::from_derive_input(&input).unwrap();
let _kube_attrs = KubeAttrs::from_derive_input(&input).unwrap();
}
}
171 changes: 120 additions & 51 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ use derivative::Derivative;
use futures::{
channel,
future::{self, BoxFuture},
stream, Future, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube_client::api::{Api, DynamicObject, ListParams, Resource};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
task::Poll,
time::Duration,
};
use stream::BoxStream;
Expand Down Expand Up @@ -217,7 +219,7 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
Expand All @@ -232,23 +234,25 @@ where
QueueStream::Error: std::error::Error + 'static,
{
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) =
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Box::pin(stream::select(
// 1. inputs from users queue stream
queue.map_err(Error::QueueError).map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
queue
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
let _ = scheduler_shutdown_tx.send(());
tracing::debug!("applier queue terminated, starting graceful shutdown")
}),
// 2. requests sent to scheduler_tx
scheduler_rx
.map(Ok)
Expand All @@ -261,56 +265,121 @@ where
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let reconciler_span = info_span!("reconciling object", "object.ref" = %request.obj_ref, object.reason = %request.reason);
reconciler_span.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.instrument(reconciler_span.clone())
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res, reconciler_span)))
.left_future()
},
None => future::err(
Error::ObjectNotFound(request.obj_ref.erase())
)
.right_future(),
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(obj, context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
PostReconciler::new(
res,
|err| error_policy(err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| {
let (Action { requeue_after }, requeue_reason) = match &reconciler_result {
Ok(action) =>
// do what user told us
(action.clone(), ReconcileReason::ReconcilerRequestedRetry),
Err(err) =>
// reconciler fn call failed
(reconciler_span.in_scope(|| error_policy(err, err_context.clone())), ReconcileReason::ErrorPolicyRequestedRetry),
};
let mut scheduler_tx = scheduler_tx.clone();
async move {
// Transmit the requeue request to the scheduler (picked up again at top)
if let Some(delay) = requeue_after {
// Failure to schedule item = in graceful shutdown mode, ignore
let _ = scheduler_tx
.send(ScheduleRequest {
message: ReconcileRequest {obj_ref: obj_ref.clone(), reason: requeue_reason},
run_at: Instant::now() + delay,
})
.await;
}
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase()))
}
.and_then(move |(obj_ref, reconciler_result)| async move {
match reconciler_result {
Ok(action) => Ok((obj_ref, action)),
Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
}
})
.on_complete(async { tracing::debug!("applier terminated") })
}

/// Internal helper that runs post-reconciliation (such as requesting rescheduling) tasks in the scheduled context of the reconciler
///
/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
#[pin_project]
#[must_use]
struct PostReconciler<K: Resource, ReconcilerErr> {
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,

reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
result: Option<Result<Action, ReconcilerErr>>,
}

impl<K, ReconcilerErr> PostReconciler<K, ReconcilerErr>
where
K: Resource,
{
fn new(
result: Result<Action, ReconcilerErr>,
error_policy: impl FnOnce(&ReconcilerErr) -> Action,
obj_ref: ObjectRef<K>,
reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
) -> Self {
let reconciler_finished_at = Instant::now();

let (action, reschedule_reason) = result.as_ref().map_or_else(
|err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
|action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
);

Self {
reschedule_tx,
reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
message: ReconcileRequest {
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at + requeue_after,
}),
result: Some(result),
}
}
}

impl<K, ReconcilerErr> Future for PostReconciler<K, ReconcilerErr>
where
K: Resource,
{
type Output = Result<Action, ReconcilerErr>;

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

if this.reschedule_request.is_some() {
let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
let reschedule_request = this
.reschedule_request
.take()
.expect("PostReconciler::reschedule_request was taken during processing");
// Failure to schedule item = in graceful shutdown mode, ignore
if let Ok(()) = rescheduler_ready {
let _ = this.reschedule_tx.start_send(reschedule_request);
}
}

Poll::Ready(
this.result
.take()
.expect("PostReconciler::result was already taken"),
)
}
}

/// Controller
///
/// A controller is made up of:
Expand Down Expand Up @@ -739,7 +808,7 @@ where
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
Expand Down
33 changes: 33 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,36 @@ pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
}

impl<S: Stream> KubeRuntimeStreamExt for S {}

#[cfg(test)]
mod tests {
use std::convert::Infallible;

use futures::stream::{self, StreamExt};

use super::trystream_try_via;

// Type-level test does not need to be executed
#[allow(dead_code)]
fn trystream_try_via_should_be_able_to_borrow() {
struct WeirdComplexObject {}
impl Drop for WeirdComplexObject {
fn drop(&mut self) {}
}

let mut x = WeirdComplexObject {};
let y = WeirdComplexObject {};
drop(trystream_try_via(
Box::pin(stream::once(async {
let _ = &mut x;
Result::<_, Infallible>::Ok(())
})),
|s| {
s.map(|_| {
let _ = &y;
Ok(())
})
},
));
}
}

0 comments on commit c25bd48

Please sign in to comment.