Skip to content

Commit ab45939

Browse files
authored
fix(reduce transform): Fix flush not occuring when events arrive in high rate (#16146)
* transforms/reduce: fix flush not occurring Fixes flush not occuring when events arrive in high frequency (in rate faster then `expire_after` between events). Fixes #16145. * transforms/reduce: add unit-tests
1 parent ba8c096 commit ab45939

File tree

1 file changed

+51
-3
lines changed

1 file changed

+51
-3
lines changed

src/transforms/reduce/mod.rs

+51-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::BTreeMap;
22
use std::{
3+
cmp::min,
34
collections::{hash_map, HashMap},
45
num::NonZeroUsize,
56
pin::Pin,
@@ -213,6 +214,7 @@ struct ReduceState {
213214
events: usize,
214215
fields: HashMap<String, Box<dyn ReduceValueMerger>>,
215216
stale_since: Instant,
217+
last_flushed_at: Instant,
216218
metadata: EventMetadata,
217219
}
218220

@@ -224,6 +226,7 @@ impl ReduceState {
224226
Self {
225227
events: 0,
226228
stale_since: Instant::now(),
229+
last_flushed_at: Instant::now(),
227230
fields,
228231
metadata,
229232
}
@@ -327,9 +330,10 @@ impl Reduce {
327330
fn flush_into(&mut self, output: &mut Vec<Event>) {
328331
let mut flush_discriminants = Vec::new();
329332
let now = Instant::now();
330-
for (k, t) in &self.reduce_merge_states {
331-
if (now - t.stale_since) >= self.expire_after {
333+
for (k, t) in &mut self.reduce_merge_states {
334+
if now - min(t.stale_since, t.last_flushed_at) >= self.expire_after {
332335
flush_discriminants.push(k.clone());
336+
t.last_flushed_at = Instant::now();
333337
}
334338
}
335339
for k in &flush_discriminants {
@@ -456,7 +460,8 @@ impl TaskTransform<Event> for Reduce {
456460
#[cfg(test)]
457461
mod test {
458462
use serde_json::json;
459-
use tokio::sync::mpsc;
463+
use tokio::sync::mpsc::{self, Sender};
464+
use tokio::time::sleep;
460465
use tokio_stream::wrappers::ReceiverStream;
461466
use value::Kind;
462467

@@ -884,4 +889,47 @@ merge_strategies.bar = "concat"
884889
})
885890
.await;
886891
}
892+
893+
/// Tests the case where both starts_when and ends_when are not defined,
894+
/// and aggregation continues on and on, without flushing as long as events
895+
/// arrive in rate that is faster than the rate of expire_ms between events.
896+
#[tokio::test]
897+
async fn last_flush_at() {
898+
let reduce_config = toml::from_str::<ReduceConfig>(
899+
r#"
900+
group_by = [ "user_id" ]
901+
expire_after_ms = 200
902+
flush_period_ms = 250
903+
"#,
904+
)
905+
.unwrap();
906+
907+
assert_transform_compliance(async move {
908+
let (tx, rx) = mpsc::channel(1);
909+
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
910+
911+
async fn send_event(tx: &Sender<Event>, user_id: i32) {
912+
let mut log_event = LogEvent::from("test message");
913+
log_event.insert("user_id", user_id.to_string());
914+
tx.send(log_event.into()).await.unwrap();
915+
}
916+
917+
// send in a rate that is double than the rate of of expire_ms between events
918+
for _ in 0..5 {
919+
send_event(&tx, 1).await;
920+
sleep(Duration::from_millis(50)).await;
921+
send_event(&tx, 2).await;
922+
sleep(Duration::from_millis(50)).await;
923+
}
924+
925+
// verify messages arrive during this time
926+
out.try_recv().expect("No message arrived");
927+
sleep(Duration::from_millis(10)).await;
928+
out.try_recv().expect("No message arrived");
929+
930+
drop(tx);
931+
topology.stop().await;
932+
})
933+
.await;
934+
}
887935
}

0 commit comments

Comments
 (0)