From 77ab6ce1ea79d516058f39a70305957493ea5f46 Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Tue, 25 Jul 2023 18:04:59 +0530 Subject: [PATCH] add `controller::Config` and debounce period to scheduler Add `controller::Config` to allow configuring the behavior of the controller. Introduce a debounce period for the scheduler to allow for deduplication of requests. By default, the debounce period is set to 1 second. Signed-off-by: Sanskar Jaiswal --- examples/configmapgen_controller.rs | 5 +- kube-runtime/src/controller/mod.rs | 59 +++++++++++---- kube-runtime/src/controller/runner.rs | 9 ++- kube-runtime/src/scheduler.rs | 104 ++++++++++++++++++++++++-- 4 files changed, 152 insertions(+), 25 deletions(-) diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index 20422be74..239e60391 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap; use kube::{ api::{Api, ObjectMeta, Patch, PatchParams, Resource}, runtime::{ - controller::{Action, Controller}, + controller::{Action, Controller, Config}, watcher, }, Client, CustomResource, @@ -102,8 +102,11 @@ async fn main() -> Result<()> { } }); + let mut config = Config::default(); + config.set_debounce(Duration::from_secs(2)); Controller::new(cmgs, watcher::Config::default()) .owns(cms, watcher::Config::default()) + .with_config(config) .reconcile_all_on(reload_rx.map(|_| ())) .shutdown_on_signal() .run(reconcile, error_policy, Arc::new(Data { client })) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 80ed72fb9..f0cbeb49d 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -9,7 +9,7 @@ use crate::{ }, scheduler::{scheduler, ScheduleRequest}, utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, - watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff}, + watcher::{self, metadata_watcher, watcher, DefaultBackoff}, }; use backoff::backoff::Backoff; use derivative::Derivative; @@ -234,6 +234,7 @@ impl Display for ReconcileReason { } const APPLIER_REQUEUE_BUF_SIZE: usize = 100; +const SCHEDULER_DEBOUNCE_PERIOD: Duration = Duration::from_secs(1); /// Apply a reconciler to an input stream, with a given retry policy /// @@ -252,6 +253,7 @@ pub fn applier( context: Arc, store: Store, queue: QueueStream, + debounce: Option ) -> impl Stream, Action), Error>> where K: Clone + Resource + 'static, @@ -291,7 +293,7 @@ where )), // all the Oks from the select gets passed through the scheduler stream, and are then executed move |s| { - Runner::new(scheduler(s), move |request| { + Runner::new(scheduler(s, debounce.or(Some(SCHEDULER_DEBOUNCE_PERIOD))), move |request| { let request = request.clone(); match store.get(&request.obj_ref) { Some(obj) => { @@ -417,6 +419,22 @@ where } } +/// Config contains all the options that can be used to configure +/// the behavior of the contorller. +#[derive(Clone, Debug, Default)] +pub struct Config { + /// The debounce time that allows for deduplication of events, preventing + /// unnecessary reconciliations. By default, it is set to 1 second, but users + /// should modify it according to the needs of their controller. + debounce: Option, +} + +impl Config { + pub fn set_debounce(&mut self, debounce: Duration) { + self.debounce = Some(debounce); + } +} + /// Controller for a Resource `K` /// /// A controller is an infinite stream of objects to be reconciled. @@ -505,6 +523,7 @@ where forceful_shutdown_selector: Vec>, dyntype: K::DynamicType, reader: Store, + config: Config, } impl Controller @@ -516,11 +535,11 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// - /// The [`Config`] controls to the possible subset of objects of `K` that you want to manage + /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage /// and receive reconcile events for. - /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. + /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`]. #[must_use] - pub fn new(main_api: Api, wc: Config) -> Self + pub fn new(main_api: Api, wc: watcher::Config) -> Self where K::DynamicType: Default, { @@ -531,17 +550,17 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// - /// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`] + /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`] /// to watch - in the Api's configured scope - and receive reconcile events for. /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. /// /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types. /// - /// [`Config`]: crate::watcher::Config + /// [`watcher::Config`]: crate::watcher::Config /// [`Api`]: kube_client::Api /// [`dynamic`]: kube_client::core::dynamic /// [`Config::default`]: crate::watcher::Config::default - pub fn new_with(main_api: Api, wc: Config, dyntype: K::DynamicType) -> Self { + pub fn new_with(main_api: Api, wc: watcher::Config, dyntype: K::DynamicType) -> Self { let writer = Writer::::new(dyntype.clone()); let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); @@ -564,9 +583,16 @@ where ], dyntype, reader, + config: Default::default(), } } + /// Specify the configuration for the controller's behavior. + pub fn with_config(mut self, config: Config) -> Self { + self.config = config; + self + } + /// Create a Controller for a resource `K` from a stream of `K` objects /// /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used. @@ -649,6 +675,7 @@ where ], dyntype, reader, + config: Default::default(), } } @@ -683,7 +710,7 @@ where pub fn owns + DeserializeOwned + Debug + Send + 'static>( self, api: Api, - wc: Config, + wc: watcher::Config, ) -> Self { self.owns_with(api, (), wc) } @@ -696,7 +723,7 @@ where mut self, api: Api, dyntype: Child::DynamicType, - wc: Config, + wc: watcher::Config, ) -> Self where Child::DynamicType: Debug + Eq + Hash + Clone, @@ -847,7 +874,7 @@ where pub fn watches( self, api: Api, - wc: Config, + wc: watcher::Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where @@ -867,7 +894,7 @@ where mut self, api: Api, dyntype: Other::DynamicType, - wc: Config, + wc: watcher::Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where @@ -1214,6 +1241,7 @@ where self.reader, StreamBackoff::new(self.trigger_selector, self.trigger_backoff) .take_until(future::select_all(self.graceful_shutdown_selector)), + self.config.debounce, ) .take_until(futures::future::select_all(self.forceful_shutdown_selector)) } @@ -1298,15 +1326,18 @@ mod tests { let applier = applier( |obj, _| { Box::pin(async move { - // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately + // Try to flood the rescheduling buffer buffer by just putting it back in the queue + // almost immediately, but making sure its after the debounce time, so that the + // scheduler actuallys runs the request. println!("reconciling {:?}", obj.metadata.name); - Ok(Action::requeue(Duration::ZERO)) + Ok(Action::requeue(Duration::from_millis(2))) }) }, |_: Arc, _: &Infallible, _| todo!(), Arc::new(()), store_rx, queue_rx.map(Result::<_, Infallible>::Ok), + Some(Duration::from_millis(1)), ); pin_mut!(applier); for i in 0..items { diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 02fe35a74..b6f1ef714 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -158,7 +158,9 @@ mod tests { let mut count = 0; let (mut sched_tx, sched_rx) = mpsc::unbounded(); let mut runner = Box::pin( - Runner::new(scheduler(sched_rx), |_| { + // The debounce period needs to zero because otherwise the scheduler has a default + // debounce period of 1 ms, which will lead to the second request to be discarded. + Runner::new(scheduler(sched_rx, Some(Duration::ZERO)), |_| { count += 1; // Panic if this ref is already held, to simulate some unsafe action.. let mutex_ref = rc.borrow_mut(); @@ -203,7 +205,7 @@ mod tests { // pause(); let (mut sched_tx, sched_rx) = mpsc::unbounded(); let (result_tx, result_rx) = oneshot::channel(); - let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg)); + let mut runner = Runner::new(scheduler(sched_rx, None), |msg: &u8| futures::future::ready(*msg)); // Start a background task that starts listening /before/ we enqueue the message // We can't just use Stream::poll_next(), since that bypasses the waker system Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() }); @@ -242,6 +244,7 @@ mod tests { run_at: Instant::now(), }]) .chain(stream::pending()), + None ), |msg| { assert!(*is_ready.lock().unwrap()); @@ -278,6 +281,7 @@ mod tests { }, ]) .chain(stream::pending()), + None ), |msg| { assert!(*is_ready.lock().unwrap()); @@ -313,6 +317,7 @@ mod tests { run_at: Instant::now(), }]) .chain(stream::pending()), + None ), |()| { panic!("run_msg should never be invoked if readiness gate fails"); diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 7ff1cd2e1..58a287f79 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -8,6 +8,7 @@ use std::{ hash::Hash, pin::Pin, task::{Context, Poll}, + time::Duration, }; use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue}; @@ -44,15 +45,22 @@ pub struct Scheduler { /// Incoming queue of scheduling requests. #[pin] requests: Fuse, + /// Debounce time to allow for deduplication of requests. + debounce: Duration, + /// Expired contains a list of messages that have ran, mapped to the time at which it ran. + /// It is used to make sure we don't run the same request twice within the debounce period. + expired: HashMap, } impl Scheduler { - fn new(requests: R) -> Self { + fn new(requests: R, debounce: Duration) -> Self { Self { queue: DelayQueue::new(), scheduled: HashMap::new(), pending: HashSet::new(), requests: requests.fuse(), + debounce, + expired: HashMap::new(), } } } @@ -66,11 +74,13 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // Message is already pending, so we can't even expedite it return; } - match self.scheduled.entry(request.message) { + match self.scheduled.entry(request.message.clone()) { + // If new request is supposed to be earlier than the current entry's scheduled + // time (for eg: the new request is user triggered and the current entry is the + // reconciler's usual retry), then give priority to the new request. Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => { // Old entry will run after the new request, so replace it.. let entry = old_entry.get_mut(); - // TODO: this should add a little delay here to actually debounce self.queue.reset_at(&entry.queue_key, request.run_at); entry.run_at = request.run_at; old_entry.replace_key(); @@ -81,6 +91,14 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { Entry::Vacant(entry) => { // No old entry, we're free to go! let message = entry.key().clone(); + if let Some(ran_at) = self.expired.get(&message) { + // If the new request is supposed to be run at a time which is earlier + // than the last time the same request was run + the configured debounce + // time, then skip the request. + if request.run_at < *ran_at + *self.debounce { + return; + } + } entry.insert(ScheduledEntry { run_at: request.run_at, queue_key: self.queue.insert_at(message, request.run_at), @@ -96,6 +114,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { can_take_message: impl Fn(&T) -> bool, ) -> Poll { if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() { + self.expired.insert(msg.clone(), Instant::now()); return Poll::Ready(self.pending.take(&msg).unwrap()); } @@ -103,10 +122,11 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { match self.queue.poll_expired(cx) { Poll::Ready(Some(msg)) => { let msg = msg.into_inner(); - let (msg, _) = self.scheduled.remove_entry(&msg).expect( + let (msg, entry) = self.scheduled.remove_entry(&msg).expect( "Expired message was popped from the Scheduler queue, but was not in the metadata map", ); if can_take_message(&msg) { + self.expired.insert(msg.clone(), entry.run_at); break Poll::Ready(msg); } self.pending.insert(msg); @@ -202,8 +222,11 @@ where /// is ready for it). /// /// The [`Scheduler`] terminates as soon as `requests` does. -pub fn scheduler>>(requests: S) -> Scheduler { - Scheduler::new(requests) +pub fn scheduler>>( + requests: S, + debounce: Option, +) -> Scheduler { + Scheduler::new(requests, debounce.unwrap_or(Duration::ZERO)) } #[cfg(test)] @@ -238,6 +261,7 @@ mod tests { run_at: Instant::now(), }]) .on_complete(sleep(Duration::from_secs(4))), + None, )); assert!(!scheduler.contains_pending(&1)); assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending()); @@ -254,7 +278,7 @@ mod tests { async fn scheduler_should_not_reschedule_pending_items() { pause(); let (mut tx, rx) = mpsc::unbounded::>(); - let mut scheduler = Box::pin(scheduler(rx)); + let mut scheduler = Box::pin(scheduler(rx, None)); tx.send(ScheduleRequest { message: 1, run_at: Instant::now(), @@ -295,6 +319,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(2))), + None, )); assert_eq!( scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(), @@ -317,6 +342,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -344,6 +370,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -368,6 +395,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -381,7 +409,7 @@ mod tests { async fn scheduler_dedupe_should_allow_rescheduling_emitted_item() { pause(); let (mut schedule_tx, schedule_rx) = mpsc::unbounded(); - let mut scheduler = scheduler(schedule_rx); + let mut scheduler = scheduler(schedule_rx, None); schedule_tx .send(ScheduleRequest { message: (), @@ -423,6 +451,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![2]); } @@ -444,7 +473,66 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), + None, ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![1]); } + + #[tokio::test] + async fn scheduler_should_discard_same_message_within_debounce_period() { + pause(); + + let now = Instant::now(); + let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); + let mut scheduler = scheduler( + sched_rx, + Some(Duration::from_secs(5)), + ); + + sched_tx.send(ScheduleRequest{ + message: SingletonMessage(1), + run_at: now + Duration::from_secs(1), + }).await.unwrap(); + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(2)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 1); + // make sure that we insert the popped message into expired + assert_eq!(scheduler.expired.len(), 1); + + sched_tx.send(ScheduleRequest{ + message: SingletonMessage(1), + run_at: now + Duration::from_secs(3), + }).await.unwrap(); + advance(Duration::from_secs(2)).await; + assert!(poll!(scheduler.next()).is_pending()); + } + + #[tokio::test] + async fn scheduler_should_emit_same_message_after_debounce_period() { + pause(); + + let now = Instant::now(); + let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); + let mut scheduler = scheduler( + sched_rx, + Some(Duration::from_secs(1)), + ); + + sched_tx.send(ScheduleRequest{ + message: SingletonMessage(1), + run_at: now + Duration::from_secs(1), + }).await.unwrap(); + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(2)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 1); + // make sure that we insert the popped message into expired + assert_eq!(scheduler.expired.len(), 1); + + sched_tx.send(ScheduleRequest{ + message: SingletonMessage(2), + run_at: now + Duration::from_secs(3), + }).await.unwrap(); + advance(Duration::from_secs(2)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 2); + } }