1
1
use std:: collections:: BTreeMap ;
2
2
use std:: {
3
- cmp:: min,
4
3
collections:: { hash_map, HashMap } ,
5
4
num:: NonZeroUsize ,
6
5
pin:: Pin ,
@@ -230,7 +229,6 @@ struct ReduceState {
230
229
events : usize ,
231
230
fields : HashMap < String , Box < dyn ReduceValueMerger > > ,
232
231
stale_since : Instant ,
233
- last_flushed_at : Instant ,
234
232
metadata : EventMetadata ,
235
233
}
236
234
@@ -242,7 +240,6 @@ impl ReduceState {
242
240
Self {
243
241
events : 0 ,
244
242
stale_since : Instant :: now ( ) ,
245
- last_flushed_at : Instant :: now ( ) ,
246
243
fields,
247
244
metadata,
248
245
}
@@ -346,10 +343,9 @@ impl Reduce {
346
343
fn flush_into ( & mut self , output : & mut Vec < Event > ) {
347
344
let mut flush_discriminants = Vec :: new ( ) ;
348
345
let now = Instant :: now ( ) ;
349
- for ( k, t) in & mut self . reduce_merge_states {
350
- if now - min ( t. stale_since , t . last_flushed_at ) >= self . expire_after {
346
+ for ( k, t) in & self . reduce_merge_states {
347
+ if ( now - t. stale_since ) >= self . expire_after {
351
348
flush_discriminants. push ( k. clone ( ) ) ;
352
- t. last_flushed_at = Instant :: now ( ) ;
353
349
}
354
350
}
355
351
for k in & flush_discriminants {
@@ -476,8 +472,7 @@ impl TaskTransform<Event> for Reduce {
476
472
#[ cfg( test) ]
477
473
mod test {
478
474
use serde_json:: json;
479
- use tokio:: sync:: mpsc:: { self , Sender } ;
480
- use tokio:: time:: sleep;
475
+ use tokio:: sync:: mpsc;
481
476
use tokio_stream:: wrappers:: ReceiverStream ;
482
477
use value:: Kind ;
483
478
@@ -908,47 +903,4 @@ merge_strategies.bar = "concat"
908
903
} )
909
904
. await ;
910
905
}
911
-
912
- /// Tests the case where both starts_when and ends_when are not defined,
913
- /// and aggregation continues on and on, without flushing as long as events
914
- /// arrive in rate that is faster than the rate of expire_ms between events.
915
- #[ tokio:: test]
916
- async fn last_flush_at ( ) {
917
- let reduce_config = toml:: from_str :: < ReduceConfig > (
918
- r#"
919
- group_by = [ "user_id" ]
920
- expire_after_ms = 200
921
- flush_period_ms = 250
922
- "# ,
923
- )
924
- . unwrap ( ) ;
925
-
926
- assert_transform_compliance ( async move {
927
- let ( tx, rx) = mpsc:: channel ( 1 ) ;
928
- let ( topology, mut out) = create_topology ( ReceiverStream :: new ( rx) , reduce_config) . await ;
929
-
930
- async fn send_event ( tx : & Sender < Event > , user_id : i32 ) {
931
- let mut log_event = LogEvent :: from ( "test message" ) ;
932
- log_event. insert ( "user_id" , user_id. to_string ( ) ) ;
933
- tx. send ( log_event. into ( ) ) . await . unwrap ( ) ;
934
- }
935
-
936
- // send in a rate that is double than the rate of of expire_ms between events
937
- for _ in 0 ..5 {
938
- send_event ( & tx, 1 ) . await ;
939
- sleep ( Duration :: from_millis ( 50 ) ) . await ;
940
- send_event ( & tx, 2 ) . await ;
941
- sleep ( Duration :: from_millis ( 50 ) ) . await ;
942
- }
943
-
944
- // verify messages arrive during this time
945
- out. try_recv ( ) . expect ( "No message arrived" ) ;
946
- sleep ( Duration :: from_millis ( 10 ) ) . await ;
947
- out. try_recv ( ) . expect ( "No message arrived" ) ;
948
-
949
- drop ( tx) ;
950
- topology. stop ( ) . await ;
951
- } )
952
- . await ;
953
- }
954
906
}
0 commit comments