From 7c23b04a98e7a32d2722d86e58264b56fdc8304a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 8 Jun 2022 12:53:32 +0200 Subject: [PATCH] Add test for #926 --- kube-runtime/src/controller/mod.rs | 66 +++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 5 deletions(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 5be9bacda..517e2881a 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -766,12 +766,18 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - - use super::Action; - use crate::Controller; + use std::{convert::Infallible, sync::Arc, time::Duration}; + + use super::{Action, APPLIER_REQUEUE_BUF_SIZE}; + use crate::{ + applier, + reflector::{self, ObjectRef}, + watcher, Controller, + }; + use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; - use kube_client::Api; + use kube_client::{core::ObjectMeta, Api}; + use tokio::time::timeout; fn assert_send(x: T) -> T { x @@ -794,4 +800,54 @@ mod tests { ), ); } + + #[tokio::test] + async fn applier_must_not_deadlock_if_reschedule_buffer_fills() { + // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles + // This is intended to avoid regressing on https://github.com/kube-rs/kube-rs/issues/926 + + // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation" + // On my (@teozkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives + let items = APPLIER_REQUEUE_BUF_SIZE * 50; + // Assume that everything's OK if we can reconcile every object 10 times on average + let reconciles = items * 10; + + let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::>(); + let (store_rx, mut store_tx) = reflector::store(); + let applier = tokio::spawn( + applier( + |obj, _| { + Box::pin(async move { + // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately + println!("reconciling {:?}", obj.metadata.name); + Ok(Action::requeue(Duration::ZERO)) + }) + }, + |_: &Infallible, _| todo!(), + Arc::new(()), + store_rx, + queue_rx.map(Result::<_, Infallible>::Ok), + ) + .take(reconciles) + .try_for_each(|_| async { Ok(()) }), + ); + for i in 0..items { + let obj = ConfigMap { + metadata: ObjectMeta { + name: Some(format!("cm-{i}")), + namespace: Some("default".to_string()), + ..Default::default() + }, + ..Default::default() + }; + store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone())); + queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap(); + } + // Keep the submission queue open to avoid going into graceful shutdown mode + timeout(Duration::from_secs(10), applier) + .await + .expect("test timeout expired, applier likely deadlocked") + .unwrap() + .unwrap(); + } }