Skip to content

Commit e86b155

Browse files
authored
fix(reduce transform): Revert flushing on interval change to expire_metrics_ms (vectordotdev#17084)
Revert "fix(reduce transform): Fix flush not occuring when events arrive in high rate (vectordotdev#16146)" This reverts commit ab45939.
1 parent 5d88655 commit e86b155

File tree

1 file changed

+3
-51
lines changed

1 file changed

+3
-51
lines changed

src/transforms/reduce/mod.rs

+3-51
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::collections::BTreeMap;
22
use std::{
3-
cmp::min,
43
collections::{hash_map, HashMap},
54
num::NonZeroUsize,
65
pin::Pin,
@@ -230,7 +229,6 @@ struct ReduceState {
230229
events: usize,
231230
fields: HashMap<String, Box<dyn ReduceValueMerger>>,
232231
stale_since: Instant,
233-
last_flushed_at: Instant,
234232
metadata: EventMetadata,
235233
}
236234

@@ -242,7 +240,6 @@ impl ReduceState {
242240
Self {
243241
events: 0,
244242
stale_since: Instant::now(),
245-
last_flushed_at: Instant::now(),
246243
fields,
247244
metadata,
248245
}
@@ -346,10 +343,9 @@ impl Reduce {
346343
fn flush_into(&mut self, output: &mut Vec<Event>) {
347344
let mut flush_discriminants = Vec::new();
348345
let now = Instant::now();
349-
for (k, t) in &mut self.reduce_merge_states {
350-
if now - min(t.stale_since, t.last_flushed_at) >= self.expire_after {
346+
for (k, t) in &self.reduce_merge_states {
347+
if (now - t.stale_since) >= self.expire_after {
351348
flush_discriminants.push(k.clone());
352-
t.last_flushed_at = Instant::now();
353349
}
354350
}
355351
for k in &flush_discriminants {
@@ -476,8 +472,7 @@ impl TaskTransform<Event> for Reduce {
476472
#[cfg(test)]
477473
mod test {
478474
use serde_json::json;
479-
use tokio::sync::mpsc::{self, Sender};
480-
use tokio::time::sleep;
475+
use tokio::sync::mpsc;
481476
use tokio_stream::wrappers::ReceiverStream;
482477
use value::Kind;
483478

@@ -908,47 +903,4 @@ merge_strategies.bar = "concat"
908903
})
909904
.await;
910905
}
911-
912-
/// Tests the case where both starts_when and ends_when are not defined,
913-
/// and aggregation continues on and on, without flushing as long as events
914-
/// arrive in rate that is faster than the rate of expire_ms between events.
915-
#[tokio::test]
916-
async fn last_flush_at() {
917-
let reduce_config = toml::from_str::<ReduceConfig>(
918-
r#"
919-
group_by = [ "user_id" ]
920-
expire_after_ms = 200
921-
flush_period_ms = 250
922-
"#,
923-
)
924-
.unwrap();
925-
926-
assert_transform_compliance(async move {
927-
let (tx, rx) = mpsc::channel(1);
928-
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
929-
930-
async fn send_event(tx: &Sender<Event>, user_id: i32) {
931-
let mut log_event = LogEvent::from("test message");
932-
log_event.insert("user_id", user_id.to_string());
933-
tx.send(log_event.into()).await.unwrap();
934-
}
935-
936-
// send in a rate that is double than the rate of of expire_ms between events
937-
for _ in 0..5 {
938-
send_event(&tx, 1).await;
939-
sleep(Duration::from_millis(50)).await;
940-
send_event(&tx, 2).await;
941-
sleep(Duration::from_millis(50)).await;
942-
}
943-
944-
// verify messages arrive during this time
945-
out.try_recv().expect("No message arrived");
946-
sleep(Duration::from_millis(10)).await;
947-
out.try_recv().expect("No message arrived");
948-
949-
drop(tx);
950-
topology.stop().await;
951-
})
952-
.await;
953-
}
954906
}

0 commit comments

Comments
 (0)