Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add controller::Config and debounce period to scheduler #1265

Merged
merged 6 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
runtime::{
controller::{Action, Controller},
controller::{Action, Config, Controller},
watcher,
},
Client, CustomResource,
Expand Down Expand Up @@ -102,8 +102,11 @@ async fn main() -> Result<()> {
}
});

let mut config = Config::default();
config.debounce(Duration::from_secs(2));
Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.with_config(config)
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Data { client }))
Expand Down
127 changes: 81 additions & 46 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
},
scheduler::{scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
Expand Down Expand Up @@ -234,6 +234,7 @@ impl Display for ReconcileReason {
}

const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
const SCHEDULER_DEBOUNCE_PERIOD: Duration = Duration::from_secs(1);
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved

/// Apply a reconciler to an input stream, with a given retry policy
///
Expand All @@ -252,6 +253,7 @@ pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
debounce: Option<Duration>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
Expand All @@ -276,7 +278,7 @@ where
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
run_at: Instant::now(),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
Expand All @@ -291,39 +293,42 @@ where
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(scheduler(s), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
Runner::new(
scheduler(s, debounce.or(Some(SCHEDULER_DEBOUNCE_PERIOD))),
move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
},
)
.delay_tasks_until(async move {
tracing::debug!("applier runner held until store is ready");
let res = delay_store.wait_until_ready().await;
Expand Down Expand Up @@ -417,6 +422,22 @@ where
}
}

/// Config contains all the options that can be used to configure
/// the behavior of the contorller.
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Clone, Debug, Default)]
pub struct Config {
/// The debounce time that allows for deduplication of events, preventing
/// unnecessary reconciliations. By default, it is set to 1 second, but users
/// should modify it according to the needs of their controller.
debounce: Option<Duration>,
}

impl Config {
pub fn debounce(&mut self, debounce: Duration) {
self.debounce = Some(debounce);
}
}

/// Controller for a Resource `K`
///
/// A controller is an infinite stream of objects to be reconciled.
Expand Down Expand Up @@ -505,6 +526,7 @@ where
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
config: Config,
}

impl<K> Controller<K>
Expand All @@ -516,11 +538,11 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] controls to the possible subset of objects of `K` that you want to manage
/// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
/// and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
/// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
#[must_use]
pub fn new(main_api: Api<K>, wc: Config) -> Self
pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
where
K::DynamicType: Default,
{
Expand All @@ -531,17 +553,17 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// to watch - in the Api's configured scope - and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
///
/// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
///
/// [`Config`]: crate::watcher::Config
/// [`watcher::Config`]: crate::watcher::Config
/// [`Api`]: kube_client::Api
/// [`dynamic`]: kube_client::core::dynamic
/// [`Config::default`]: crate::watcher::Config::default
pub fn new_with(main_api: Api<K>, wc: Config, dyntype: K::DynamicType) -> Self {
pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
Expand All @@ -564,6 +586,7 @@ where
],
dyntype,
reader,
config: Default::default(),
}
}

Expand Down Expand Up @@ -649,9 +672,17 @@ where
],
dyntype,
reader,
config: Default::default(),
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
self.config = config;
self
}

/// Specify the backoff policy for "trigger" watches
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
Expand Down Expand Up @@ -683,7 +714,7 @@ where
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
wc: Config,
wc: watcher::Config,
) -> Self {
self.owns_with(api, (), wc)
}
Expand All @@ -696,7 +727,7 @@ where
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
wc: Config,
wc: watcher::Config,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
Expand Down Expand Up @@ -847,7 +878,7 @@ where
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand All @@ -867,7 +898,7 @@ where
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand Down Expand Up @@ -1214,6 +1245,7 @@ where
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
self.config.debounce,
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
Expand Down Expand Up @@ -1298,15 +1330,18 @@ mod tests {
let applier = applier(
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
// Try to flood the rescheduling buffer buffer by just putting it back in the queue
// almost immediately, but making sure its after the debounce time, so that the
// scheduler actuallys runs the request.
println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
Ok(Action::requeue(Duration::from_millis(2)))
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
})
},
|_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Some(Duration::from_millis(1)),
);
pin_mut!(applier);
for i in 0..items {
Expand Down
9 changes: 7 additions & 2 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ mod tests {
let mut count = 0;
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), |_| {
// The debounce period needs to zero because otherwise the scheduler has a default
// debounce period of 1 ms, which will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx, Some(Duration::ZERO)), |_| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
let mutex_ref = rc.borrow_mut();
Expand Down Expand Up @@ -203,7 +205,7 @@ mod tests {
// pause();
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
let mut runner = Runner::new(scheduler(sched_rx, None), |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
Expand Down Expand Up @@ -242,6 +244,7 @@ mod tests {
run_at: Instant::now(),
}])
.chain(stream::pending()),
None,
),
|msg| {
assert!(*is_ready.lock().unwrap());
Expand Down Expand Up @@ -278,6 +281,7 @@ mod tests {
},
])
.chain(stream::pending()),
None,
),
|msg| {
assert!(*is_ready.lock().unwrap());
Expand Down Expand Up @@ -313,6 +317,7 @@ mod tests {
run_at: Instant::now(),
}])
.chain(stream::pending()),
None,
),
|()| {
panic!("run_msg should never be invoked if readiness gate fails");
Expand Down
Loading