Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tests): Adjust buffering tests #2862

Merged
merged 4 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,11 @@ pub struct CountReceiver {
}

impl CountReceiver {
pub fn wait(self) -> usize {
pub fn cancel(self) {
self.trigger.cancel();
}

pub fn wait(self) -> usize {
self.handle.wait().unwrap()
}
}
Expand Down
84 changes: 45 additions & 39 deletions tests/buffering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

use futures01::{Future, Sink};
use prost::Message;
use std::sync::atomic::Ordering;
use tempfile::tempdir;
use tracing::trace;
use vector::event;
use vector::test_util::{self, next_addr};
use vector::test_util::{self, next_addr, wait_for_atomic_usize};
use vector::topology::{self, config};
use vector::{buffers::BufferConfig, runtime, sinks};

Expand Down Expand Up @@ -62,7 +61,6 @@ fn test_buffering() {
let mut rt = test_util::runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

let (input_events, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);
Expand All @@ -71,17 +69,25 @@ fn test_buffering() {
.send_all(input_events_stream);
let _ = rt.block_on(send).unwrap();

// There was a race caused by a channel in the mock source, and this
// check is here to ensure it's really gone.
assert_eq!(source_event_counter.load(Ordering::Acquire), num_events);

// Give the topology some time to process the received data and simulate
// a crash.
// We need to wait for at least the source to process events.
// This is to avoid a race after we send all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and source processing
// all of the events. We need for the source to process events before `terminate_abruptly`
// so we wait for that here.
wait_for_atomic_usize(source_event_counter, |x| x == num_events);
// Now we give it some time for the events to propagate to File.
// This is to avoid a race after the source processes all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and events being written
// to file. We need for the events to be written to the file before `terminate_abruptly`.
// We can't know when exactly all of the events have been written, so we have to guess.
// But it should be shortly after source processing all of the events.
std::thread::sleep(std::time::Duration::from_secs(1));
// Simulate a crash.
terminate_abruptly(rt, topology);

// Then run vector again with a sink that accepts events now. It should
// send all of the events from the first run.
let (in_tx, source_config, source_event_counter) = support::source_with_event_counter();
let (in_tx, source_config) = support::source();
let (out_rx, sink_config) = support::sink(10);
let config = {
let mut config = config::Config::empty();
Expand All @@ -98,7 +104,6 @@ fn test_buffering() {
let mut rt = test_util::runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

let (input_events2, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);
Expand All @@ -110,10 +115,6 @@ fn test_buffering() {

let output_events = test_util::receive_events(out_rx);

// There was a race caused by a channel in the mock source, and this
// check is here to ensure it's really gone.
assert_eq!(source_event_counter.load(Ordering::Acquire), num_events);

terminate_gracefully(rt, topology);

let output_events = output_events.wait();
Expand Down Expand Up @@ -162,20 +163,26 @@ fn test_max_size() {
let mut rt = test_util::runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

let send = in_tx
.sink_map_err(|err| panic!(err))
.send_all(input_events_stream);
let _ = rt.block_on(send).unwrap();
std::thread::sleep(std::time::Duration::from_secs(2));

// There was a race caused by a channel in the mock source, and this
// check is here to ensure it's really gone.
assert_eq!(source_event_counter.load(Ordering::Acquire), num_events);

// Give the topology some time to process the received data and simulate
// a crash.
// We need to wait for at least the source to process events.
// This is to avoid a race after we send all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and source processing
// all of the events. We need for the source to process events before `terminate_abruptly`
// so we wait for that here.
wait_for_atomic_usize(source_event_counter, |x| x == num_events);
// Now we give it some time for the events to propagate to File.
// This is to avoid a race after the source processes all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and events being written
// to file. We need for the events to be written to the file before `terminate_abruptly`.
// We can't know when exactly all of the events have been written, so we have to guess.
// But it should be shortly after source processing all of the events.
std::thread::sleep(std::time::Duration::from_secs(1));
// Simulate a crash.
terminate_abruptly(rt, topology);

// Then run vector again with a sink that accepts events now. It should
Expand All @@ -198,7 +205,6 @@ fn test_max_size() {
let mut rt = test_util::runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

let output_events = test_util::receive_events(out_rx);

Expand Down Expand Up @@ -244,7 +250,6 @@ fn test_max_size_resume() {
let mut rt = runtime::Runtime::new().unwrap();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

// Send all of the input events _before_ the output sink is ready.
// This causes the writers to stop writing to the on-disk buffer, and once
Expand Down Expand Up @@ -301,29 +306,35 @@ fn test_reclaim_disk_space() {
let mut rt = test_util::runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

let (input_events, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);
let send = in_tx
.sink_map_err(|err| panic!(err))
.send_all(input_events_stream);
let _ = rt.block_on(send).unwrap();
std::thread::sleep(std::time::Duration::from_secs(5));

// There was a race caused by a channel in the mock source, and this
// check is here to ensure it's really gone.
assert_eq!(source_event_counter.load(Ordering::Acquire), num_events);

// Give the topology some time to process the received data and simulate
// a crash.
// We need to wait for at least the source to process events.
// This is to avoid a race after we send all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and source processing
// all of the events. We need for the source to process events before `terminate_abruptly`
// so we wait for that here.
wait_for_atomic_usize(source_event_counter, |x| x == num_events);
// Now we give it some time for the events to propagate to File.
// This is to avoid a race after the source processes all events, at that point two things
// can happen in any order, we reaching `terminate_abruptly` and events being written
// to file. We need for the events to be written to the file before `terminate_abruptly`.
// We can't know when exactly all of the events have been written, so we have to guess.
// But it should be shortly after source processing all of the events.
std::thread::sleep(std::time::Duration::from_secs(1));
// Simulate a crash.
terminate_abruptly(rt, topology);

let before_disk_size: u64 = compute_disk_size(&data_dir);

// Then run vector again with a sink that accepts events now. It should
// send all of the events from the first run.
let (in_tx, source_config, source_event_counter) = support::source_with_event_counter();
let (in_tx, source_config) = support::source();
let (out_rx, sink_config) = support::sink(10);
let config = {
let mut config = config::Config::empty();
Expand All @@ -340,7 +351,6 @@ fn test_reclaim_disk_space() {
let mut rt = test_util::runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up.

let (input_events2, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);
Expand All @@ -352,10 +362,6 @@ fn test_reclaim_disk_space() {

let output_events = test_util::receive_events(out_rx);

// There was a race caused by a channel in the mock source, and this
// check is here to ensure it's really gone.
assert_eq!(source_event_counter.load(Ordering::Acquire), num_events);

terminate_gracefully(rt, topology);

let output_events = output_events.wait();
Expand Down