Skip to content

Commit

Permalink
chore(tests): Remove `s3_waits_for_full_batch_or_timeout_before_sendi…
Browse files Browse the repository at this point in the history
…ng` (#3105)

Signed-off-by: ktf <[email protected]>
  • Loading branch information
ktff authored Jul 19, 2020
1 parent e31f20d commit f37f2e0
Showing 1 changed file with 0 additions and 70 deletions.
70 changes: 0 additions & 70 deletions src/sinks/aws_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,76 +625,6 @@ mod integration_tests {
});
}

#[test]
fn s3_waits_for_full_batch_or_timeout_before_sending() {
let mut rt = runtime();
let cx = SinkContext::new_test();

let config = rt.block_on_std(async {
S3SinkConfig {
key_prefix: Some(format!("{}/{}", random_string(10), "{{i}}")),
filename_time_format: Some("waitsforfullbatch".into()),
filename_append_uuid: Some(false),
..config(1010).await
}
});

let prefix = config.key_prefix.clone();
let client = config.create_client(cx.resolver()).unwrap();
let sink = config.new(client, cx).unwrap();

let (lines, _) = random_lines_with_stream(100, 30);

let (tx, rx) = futures01::sync::mpsc::channel(1);

let mut rt = runtime();
rt.spawn_std(async {
let _ = sink.send_all(rx).compat().await.unwrap();
});

let mut tx = tx.wait();

for (i, line) in lines.iter().enumerate().take(15) {
let mut event = Event::from(line.as_str());

let i = if i < 10 { 1 } else { 2 };

event.as_mut_log().insert("i", format!("{}", i));
tx.send(event).unwrap();
}

std::thread::sleep(std::time::Duration::from_millis(100));

for (i, line) in lines.iter().skip(15).enumerate() {
let mut event = Event::from(line.as_str());

let i = if i < 5 { 2 } else { 3 };

event.as_mut_log().insert("i", format!("{}", i));
tx.send(event).unwrap();
}

drop(tx);

rt.block_on_std(async move {
let keys = get_keys(prefix.unwrap()).await;
assert_eq!(keys.len(), 3);

let response_lines = stream::iter(keys)
.fold(Vec::new(), |mut acc, key| async {
acc.push(get_lines(get_object(key).await).await);
acc
})
.await;

assert_eq!(&lines[00..10], response_lines[0].as_slice());
assert_eq!(&lines[10..20], response_lines[1].as_slice());
assert_eq!(&lines[20..30], response_lines[2].as_slice());
});

crate::test_util::shutdown_on_idle(rt);
}

#[test]
fn s3_gzip() {
let mut rt = runtime();
Expand Down

0 comments on commit f37f2e0

Please sign in to comment.