From fe94feebf66f9b380350f96d630c731eef2f3c06 Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Tue, 25 Jul 2023 18:04:59 +0530 Subject: [PATCH] add `controller::Config` and debounce period to scheduler Add `controller::Config` to allow configuring the behavior of the controller. Introduce a debounce period for the scheduler to allow for deduplication of requests. By default, the debounce period is set to 1 second. Signed-off-by: Sanskar Jaiswal --- examples/configmapgen_controller.rs | 5 +- kube-runtime/src/controller/mod.rs | 127 ++++++++++++++++---------- kube-runtime/src/controller/runner.rs | 9 +- kube-runtime/src/scheduler.rs | 88 ++++++++++++++++-- 4 files changed, 170 insertions(+), 59 deletions(-) diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index 20422be74..130108686 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap; use kube::{ api::{Api, ObjectMeta, Patch, PatchParams, Resource}, runtime::{ - controller::{Action, Controller}, + controller::{Action, Config, Controller}, watcher, }, Client, CustomResource, @@ -102,8 +102,11 @@ async fn main() -> Result<()> { } }); + let mut config = Config::default(); + config.debounce(Duration::from_secs(2)); Controller::new(cmgs, watcher::Config::default()) .owns(cms, watcher::Config::default()) + .with_config(config) .reconcile_all_on(reload_rx.map(|_| ())) .shutdown_on_signal() .run(reconcile, error_policy, Arc::new(Data { client })) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 80ed72fb9..7bb16ddd6 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -9,7 +9,7 @@ use crate::{ }, scheduler::{scheduler, ScheduleRequest}, utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, - watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff}, + watcher::{self, metadata_watcher, watcher, DefaultBackoff}, }; use backoff::backoff::Backoff; use derivative::Derivative; @@ -234,6 +234,7 @@ impl Display for ReconcileReason { } const APPLIER_REQUEUE_BUF_SIZE: usize = 100; +const SCHEDULER_DEBOUNCE_PERIOD: Duration = Duration::from_secs(1); /// Apply a reconciler to an input stream, with a given retry policy /// @@ -252,6 +253,7 @@ pub fn applier( context: Arc, store: Store, queue: QueueStream, + debounce: Option, ) -> impl Stream, Action), Error>> where K: Clone + Resource + 'static, @@ -276,7 +278,7 @@ where .map_err(Error::QueueError) .map_ok(|request| ScheduleRequest { message: request.into(), - run_at: Instant::now() + Duration::from_millis(1), + run_at: Instant::now(), }) .on_complete(async move { // On error: scheduler has already been shut down and there is nothing for us to do @@ -291,39 +293,42 @@ where )), // all the Oks from the select gets passed through the scheduler stream, and are then executed move |s| { - Runner::new(scheduler(s), move |request| { - let request = request.clone(); - match store.get(&request.obj_ref) { - Some(obj) => { - let scheduler_tx = scheduler_tx.clone(); - let error_policy_ctx = context.clone(); - let error_policy = error_policy.clone(); - let reconciler_span = info_span!( - "reconciling object", - "object.ref" = %request.obj_ref, - object.reason = %request.reason - ); - reconciler_span - .in_scope(|| reconciler(Arc::clone(&obj), context.clone())) - .into_future() - .then(move |res| { - let error_policy = error_policy; - RescheduleReconciliation::new( - res, - |err| error_policy(obj, err, error_policy_ctx), - request.obj_ref.clone(), - scheduler_tx, - ) - // Reconciler errors are OK from the applier's PoV, we need to apply the error policy - // to them separately - .map(|res| Ok((request.obj_ref, res))) - }) - .instrument(reconciler_span) - .left_future() + Runner::new( + scheduler(s, debounce.or(Some(SCHEDULER_DEBOUNCE_PERIOD))), + move |request| { + let request = request.clone(); + match store.get(&request.obj_ref) { + Some(obj) => { + let scheduler_tx = scheduler_tx.clone(); + let error_policy_ctx = context.clone(); + let error_policy = error_policy.clone(); + let reconciler_span = info_span!( + "reconciling object", + "object.ref" = %request.obj_ref, + object.reason = %request.reason + ); + reconciler_span + .in_scope(|| reconciler(Arc::clone(&obj), context.clone())) + .into_future() + .then(move |res| { + let error_policy = error_policy; + RescheduleReconciliation::new( + res, + |err| error_policy(obj, err, error_policy_ctx), + request.obj_ref.clone(), + scheduler_tx, + ) + // Reconciler errors are OK from the applier's PoV, we need to apply the error policy + // to them separately + .map(|res| Ok((request.obj_ref, res))) + }) + .instrument(reconciler_span) + .left_future() + } + None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), } - None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), - } - }) + }, + ) .delay_tasks_until(async move { tracing::debug!("applier runner held until store is ready"); let res = delay_store.wait_until_ready().await; @@ -417,6 +422,22 @@ where } } +/// Config contains all the options that can be used to configure +/// the behavior of the contorller. +#[derive(Clone, Debug, Default)] +pub struct Config { + /// The debounce time that allows for deduplication of events, preventing + /// unnecessary reconciliations. By default, it is set to 1 second, but users + /// should modify it according to the needs of their controller. + debounce: Option, +} + +impl Config { + pub fn debounce(&mut self, debounce: Duration) { + self.debounce = Some(debounce); + } +} + /// Controller for a Resource `K` /// /// A controller is an infinite stream of objects to be reconciled. @@ -505,6 +526,7 @@ where forceful_shutdown_selector: Vec>, dyntype: K::DynamicType, reader: Store, + config: Config, } impl Controller @@ -516,11 +538,11 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// - /// The [`Config`] controls to the possible subset of objects of `K` that you want to manage + /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage /// and receive reconcile events for. - /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. + /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`]. #[must_use] - pub fn new(main_api: Api, wc: Config) -> Self + pub fn new(main_api: Api, wc: watcher::Config) -> Self where K::DynamicType: Default, { @@ -531,17 +553,17 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// - /// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`] + /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`] /// to watch - in the Api's configured scope - and receive reconcile events for. /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. /// /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types. /// - /// [`Config`]: crate::watcher::Config + /// [`watcher::Config`]: crate::watcher::Config /// [`Api`]: kube_client::Api /// [`dynamic`]: kube_client::core::dynamic /// [`Config::default`]: crate::watcher::Config::default - pub fn new_with(main_api: Api, wc: Config, dyntype: K::DynamicType) -> Self { + pub fn new_with(main_api: Api, wc: watcher::Config, dyntype: K::DynamicType) -> Self { let writer = Writer::::new(dyntype.clone()); let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); @@ -564,6 +586,7 @@ where ], dyntype, reader, + config: Default::default(), } } @@ -649,9 +672,17 @@ where ], dyntype, reader, + config: Default::default(), } } + /// Specify the configuration for the controller's behavior. + #[must_use] + pub fn with_config(mut self, config: Config) -> Self { + self.config = config; + self + } + /// Specify the backoff policy for "trigger" watches /// /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`]. @@ -683,7 +714,7 @@ where pub fn owns + DeserializeOwned + Debug + Send + 'static>( self, api: Api, - wc: Config, + wc: watcher::Config, ) -> Self { self.owns_with(api, (), wc) } @@ -696,7 +727,7 @@ where mut self, api: Api, dyntype: Child::DynamicType, - wc: Config, + wc: watcher::Config, ) -> Self where Child::DynamicType: Debug + Eq + Hash + Clone, @@ -847,7 +878,7 @@ where pub fn watches( self, api: Api, - wc: Config, + wc: watcher::Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where @@ -867,7 +898,7 @@ where mut self, api: Api, dyntype: Other::DynamicType, - wc: Config, + wc: watcher::Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where @@ -1214,6 +1245,7 @@ where self.reader, StreamBackoff::new(self.trigger_selector, self.trigger_backoff) .take_until(future::select_all(self.graceful_shutdown_selector)), + self.config.debounce, ) .take_until(futures::future::select_all(self.forceful_shutdown_selector)) } @@ -1298,15 +1330,18 @@ mod tests { let applier = applier( |obj, _| { Box::pin(async move { - // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately + // Try to flood the rescheduling buffer buffer by just putting it back in the queue + // almost immediately, but making sure its after the debounce time, so that the + // scheduler actuallys runs the request. println!("reconciling {:?}", obj.metadata.name); - Ok(Action::requeue(Duration::ZERO)) + Ok(Action::requeue(Duration::from_millis(2))) }) }, |_: Arc, _: &Infallible, _| todo!(), Arc::new(()), store_rx, queue_rx.map(Result::<_, Infallible>::Ok), + Some(Duration::from_millis(1)), ); pin_mut!(applier); for i in 0..items { diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 02fe35a74..50210bbea 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -158,7 +158,9 @@ mod tests { let mut count = 0; let (mut sched_tx, sched_rx) = mpsc::unbounded(); let mut runner = Box::pin( - Runner::new(scheduler(sched_rx), |_| { + // The debounce period needs to zero because otherwise the scheduler has a default + // debounce period of 1 ms, which will lead to the second request to be discarded. + Runner::new(scheduler(sched_rx, Some(Duration::ZERO)), |_| { count += 1; // Panic if this ref is already held, to simulate some unsafe action.. let mutex_ref = rc.borrow_mut(); @@ -203,7 +205,7 @@ mod tests { // pause(); let (mut sched_tx, sched_rx) = mpsc::unbounded(); let (result_tx, result_rx) = oneshot::channel(); - let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg)); + let mut runner = Runner::new(scheduler(sched_rx, None), |msg: &u8| futures::future::ready(*msg)); // Start a background task that starts listening /before/ we enqueue the message // We can't just use Stream::poll_next(), since that bypasses the waker system Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() }); @@ -242,6 +244,7 @@ mod tests { run_at: Instant::now(), }]) .chain(stream::pending()), + None, ), |msg| { assert!(*is_ready.lock().unwrap()); @@ -278,6 +281,7 @@ mod tests { }, ]) .chain(stream::pending()), + None, ), |msg| { assert!(*is_ready.lock().unwrap()); @@ -313,6 +317,7 @@ mod tests { run_at: Instant::now(), }]) .chain(stream::pending()), + None, ), |()| { panic!("run_msg should never be invoked if readiness gate fails"); diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 7ff1cd2e1..abf81a34a 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -8,6 +8,7 @@ use std::{ hash::Hash, pin::Pin, task::{Context, Poll}, + time::Duration, }; use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue}; @@ -44,15 +45,18 @@ pub struct Scheduler { /// Incoming queue of scheduling requests. #[pin] requests: Fuse, + /// Debounce time to allow for deduplication of requests. + debounce: Duration, } impl Scheduler { - fn new(requests: R) -> Self { + fn new(requests: R, debounce: Duration) -> Self { Self { queue: DelayQueue::new(), scheduled: HashMap::new(), pending: HashSet::new(), requests: requests.fuse(), + debounce, } } } @@ -67,12 +71,15 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { return; } match self.scheduled.entry(request.message) { + // If new request is supposed to be earlier than the current entry's scheduled + // time (for eg: the new request is user triggered and the current entry is the + // reconciler's usual retry), then give priority to the new request. Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => { // Old entry will run after the new request, so replace it.. let entry = old_entry.get_mut(); - // TODO: this should add a little delay here to actually debounce - self.queue.reset_at(&entry.queue_key, request.run_at); - entry.run_at = request.run_at; + self.queue + .reset_at(&entry.queue_key, request.run_at + *self.debounce); + entry.run_at = request.run_at + *self.debounce; old_entry.replace_key(); } Entry::Occupied(_old_entry) => { @@ -82,8 +89,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // No old entry, we're free to go! let message = entry.key().clone(); entry.insert(ScheduledEntry { - run_at: request.run_at, - queue_key: self.queue.insert_at(message, request.run_at), + run_at: request.run_at + *self.debounce, + queue_key: self.queue.insert_at(message, request.run_at + *self.debounce), }); } } @@ -202,8 +209,11 @@ where /// is ready for it). /// /// The [`Scheduler`] terminates as soon as `requests` does. -pub fn scheduler>>(requests: S) -> Scheduler { - Scheduler::new(requests) +pub fn scheduler>>( + requests: S, + debounce: Option, +) -> Scheduler { + Scheduler::new(requests, debounce.unwrap_or(Duration::ZERO)) } #[cfg(test)] @@ -238,6 +248,7 @@ mod tests { run_at: Instant::now(), }]) .on_complete(sleep(Duration::from_secs(4))), + None, )); assert!(!scheduler.contains_pending(&1)); assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending()); @@ -254,7 +265,7 @@ mod tests { async fn scheduler_should_not_reschedule_pending_items() { pause(); let (mut tx, rx) = mpsc::unbounded::>(); - let mut scheduler = Box::pin(scheduler(rx)); + let mut scheduler = Box::pin(scheduler(rx, None)); tx.send(ScheduleRequest { message: 1, run_at: Instant::now(), @@ -295,6 +306,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(2))), + None, )); assert_eq!( scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(), @@ -317,6 +329,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -344,6 +357,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -368,6 +382,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -381,7 +396,7 @@ mod tests { async fn scheduler_dedupe_should_allow_rescheduling_emitted_item() { pause(); let (mut schedule_tx, schedule_rx) = mpsc::unbounded(); - let mut scheduler = scheduler(schedule_rx); + let mut scheduler = scheduler(schedule_rx, None); schedule_tx .send(ScheduleRequest { message: (), @@ -423,6 +438,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![2]); } @@ -444,7 +460,59 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![1]); } + + #[tokio::test] + async fn scheduler_should_add_debounce_to_a_request() { + pause(); + + let now = Instant::now(); + let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); + let mut scheduler = scheduler(sched_rx, Some(Duration::from_secs(2))); + + sched_tx + .send(ScheduleRequest { + message: SingletonMessage(1), + run_at: now, + }) + .await + .unwrap(); + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(3)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 1); + } + + #[tokio::test] + async fn scheduler_should_dedup_message_within_debounce_period() { + pause(); + + let now = Instant::now(); + let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); + let mut scheduler = scheduler(sched_rx, Some(Duration::from_secs(2))); + + sched_tx + .send(ScheduleRequest { + message: SingletonMessage(1), + run_at: now, + }) + .await + .unwrap(); + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(1)).await; + + sched_tx + .send(ScheduleRequest { + message: SingletonMessage(2), + run_at: now, + }) + .await + .unwrap(); + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(3)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 2); + assert!(poll!(scheduler.next()).is_pending()); + } }