From 047a73fc8900dfbe8a5492258c6d2874611b61eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 6 Jul 2023 15:45:32 +0200 Subject: [PATCH] Fix deduplication test --- kube-runtime/src/controller/runner.rs | 33 ++++++++++++++++++--------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index fa38b6f21..b9290d33d 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -142,13 +142,18 @@ mod tests { }; use futures::{ channel::{mpsc, oneshot}, - future, poll, stream, SinkExt, StreamExt, TryStreamExt, + future, poll, stream, FutureExt, SinkExt, StreamExt, TryStreamExt, + }; + use std::{ + cell::RefCell, + collections::{HashMap, HashSet}, + sync::Mutex, + time::Duration, }; - use std::{cell::RefCell, collections::HashSet, sync::Mutex, time::Duration}; use tokio::{ runtime::Handle, task::yield_now, - time::{pause, sleep, timeout, Instant}, + time::{error::Elapsed, pause, sleep, timeout, Instant}, }; #[tokio::test] @@ -265,15 +270,15 @@ mod tests { scheduler( stream::iter([ ScheduleRequest { - message: 1u8, + message: 'a', run_at: Instant::now(), }, ScheduleRequest { - message: 2u8, + message: 'b', run_at: Instant::now(), }, ScheduleRequest { - message: 1u8, + message: 'a', run_at: Instant::now(), }, ]) @@ -289,11 +294,17 @@ mod tests { assert!(poll!(runner.next()).is_pending()); *is_ready.lock().unwrap() = true; delayed_init.init(()); - assert_eq!( - runner.as_mut().take(2).try_collect::>().await.unwrap(), - HashSet::from([1, 2]) - ); - assert!(poll!(runner.next()).is_pending()); + let mut message_counts = HashMap::new(); + assert!(timeout( + Duration::from_secs(1), + runner.try_for_each(|msg| { + *message_counts.entry(msg).or_default() += 1; + async { Ok(()) } + }) + ) + .await + .is_err()); + assert_eq!(message_counts, HashMap::from([('a', 1), ('b', 1)])); } #[tokio::test]