From 359e12304adda2edf88fdb61a61284c3279b5f64 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Thu, 18 Jun 2020 17:32:35 +0200 Subject: [PATCH 1/4] Fix buffering tests Signed-off-by: Kruno Tomola Fabro --- tests/buffering.rs | 56 +++++++++++++++------------------------------- 1 file changed, 18 insertions(+), 38 deletions(-) diff --git a/tests/buffering.rs b/tests/buffering.rs index 05591d7b85cb6..69d090f8fd9aa 100644 --- a/tests/buffering.rs +++ b/tests/buffering.rs @@ -8,7 +8,7 @@ use std::sync::atomic::Ordering; use tempfile::tempdir; use tracing::trace; use vector::event; -use vector::test_util::{self, next_addr}; +use vector::test_util::{self, next_addr, wait_for}; use vector::topology::{self, config}; use vector::{buffers::BufferConfig, runtime, sinks}; @@ -62,7 +62,6 @@ fn test_buffering() { let mut rt = test_util::runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. let (input_events, input_events_stream) = test_util::random_events_with_stream(line_length, num_events); @@ -71,17 +70,16 @@ fn test_buffering() { .send_all(input_events_stream); let _ = rt.block_on(send).unwrap(); - // There was a race caused by a channel in the mock source, and this - // check is here to ensure it's really gone. - assert_eq!(source_event_counter.load(Ordering::Acquire), num_events); - - // Give the topology some time to process the received data and simulate - // a crash. + // We need to wait for at least the source to process events. + wait_for(|| source_event_counter.load(Ordering::Acquire) == num_events); + // Now we give it some time for the events to propagate to Disk. + std::thread::sleep(std::time::Duration::from_secs(1)); + // Simulate a crash. terminate_abruptly(rt, topology); // Then run vector again with a sink that accepts events now. It should // send all of the events from the first run. - let (in_tx, source_config, source_event_counter) = support::source_with_event_counter(); + let (in_tx, source_config) = support::source(); let (out_rx, sink_config) = support::sink(10); let config = { let mut config = config::Config::empty(); @@ -98,7 +96,6 @@ fn test_buffering() { let mut rt = test_util::runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. let (input_events2, input_events_stream) = test_util::random_events_with_stream(line_length, num_events); @@ -110,10 +107,6 @@ fn test_buffering() { let output_events = test_util::receive_events(out_rx); - // There was a race caused by a channel in the mock source, and this - // check is here to ensure it's really gone. - assert_eq!(source_event_counter.load(Ordering::Acquire), num_events); - terminate_gracefully(rt, topology); let output_events = output_events.wait(); @@ -162,20 +155,17 @@ fn test_max_size() { let mut rt = test_util::runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. let send = in_tx .sink_map_err(|err| panic!(err)) .send_all(input_events_stream); let _ = rt.block_on(send).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(2)); - - // There was a race caused by a channel in the mock source, and this - // check is here to ensure it's really gone. - assert_eq!(source_event_counter.load(Ordering::Acquire), num_events); - // Give the topology some time to process the received data and simulate - // a crash. + // We need to wait for at least the source to process events. + wait_for(|| source_event_counter.load(Ordering::Acquire) == num_events); + // Now we give it some time for the events to propagate to Disk. + std::thread::sleep(std::time::Duration::from_secs(1)); + // Simulate a crash. terminate_abruptly(rt, topology); // Then run vector again with a sink that accepts events now. It should @@ -198,7 +188,6 @@ fn test_max_size() { let mut rt = test_util::runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. let output_events = test_util::receive_events(out_rx); @@ -244,7 +233,6 @@ fn test_max_size_resume() { let mut rt = runtime::Runtime::new().unwrap(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. // Send all of the input events _before_ the output sink is ready. // This causes the writers to stop writing to the on-disk buffer, and once @@ -301,7 +289,6 @@ fn test_reclaim_disk_space() { let mut rt = test_util::runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. let (input_events, input_events_stream) = test_util::random_events_with_stream(line_length, num_events); @@ -309,21 +296,19 @@ fn test_reclaim_disk_space() { .sink_map_err(|err| panic!(err)) .send_all(input_events_stream); let _ = rt.block_on(send).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(5)); - // There was a race caused by a channel in the mock source, and this - // check is here to ensure it's really gone. - assert_eq!(source_event_counter.load(Ordering::Acquire), num_events); - - // Give the topology some time to process the received data and simulate - // a crash. + // We need to wait for at least the source to process events. + wait_for(|| source_event_counter.load(Ordering::Acquire) == num_events); + // Now we give it some time for the events to propagate to Disk. + std::thread::sleep(std::time::Duration::from_secs(1)); + // Simulate a crash. terminate_abruptly(rt, topology); let before_disk_size: u64 = compute_disk_size(&data_dir); // Then run vector again with a sink that accepts events now. It should // send all of the events from the first run. - let (in_tx, source_config, source_event_counter) = support::source_with_event_counter(); + let (in_tx, source_config) = support::source(); let (out_rx, sink_config) = support::sink(10); let config = { let mut config = config::Config::empty(); @@ -340,7 +325,6 @@ fn test_reclaim_disk_space() { let mut rt = test_util::runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); // Give topo a moment to start up. let (input_events2, input_events_stream) = test_util::random_events_with_stream(line_length, num_events); @@ -352,10 +336,6 @@ fn test_reclaim_disk_space() { let output_events = test_util::receive_events(out_rx); - // There was a race caused by a channel in the mock source, and this - // check is here to ensure it's really gone. - assert_eq!(source_event_counter.load(Ordering::Acquire), num_events); - terminate_gracefully(rt, topology); let output_events = output_events.wait(); From ef789387d3591150e77171a4b57e840ee3d5464f Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Thu, 18 Jun 2020 17:32:51 +0200 Subject: [PATCH 2/4] Split wait Signed-off-by: Kruno Tomola Fabro --- src/test_util.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test_util.rs b/src/test_util.rs index 413ff061627f9..2019ad31b77db 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -442,8 +442,11 @@ pub struct CountReceiver { } impl CountReceiver { - pub fn wait(self) -> usize { + pub fn cancel(self) { self.trigger.cancel(); + } + + pub fn wait(self) -> usize { self.handle.wait().unwrap() } } From 6cf745d1bf68b21bd7259e17e3eeebf63748e0a4 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Wed, 24 Jun 2020 16:13:49 +0200 Subject: [PATCH 3/4] Document the race Signed-off-by: Kruno Tomola Fabro --- tests/buffering.rs | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/tests/buffering.rs b/tests/buffering.rs index 69d090f8fd9aa..d444c1b3ed428 100644 --- a/tests/buffering.rs +++ b/tests/buffering.rs @@ -8,7 +8,7 @@ use std::sync::atomic::Ordering; use tempfile::tempdir; use tracing::trace; use vector::event; -use vector::test_util::{self, next_addr, wait_for}; +use vector::test_util::{self, next_addr, wait_for, wait_for_atomic_usize}; use vector::topology::{self, config}; use vector::{buffers::BufferConfig, runtime, sinks}; @@ -71,8 +71,17 @@ fn test_buffering() { let _ = rt.block_on(send).unwrap(); // We need to wait for at least the source to process events. - wait_for(|| source_event_counter.load(Ordering::Acquire) == num_events); - // Now we give it some time for the events to propagate to Disk. + // This is to avoid a race after we send all events, at that point two things + // can happen in any order, we reaching `terminate_abruptly` and source processing + // all of the events. We need for the source to process events before `terminate_abruptly` + // so we wait for that here. + wait_for_atomic_usize(source_event_counter, |x| x == num_events); + // Now we give it some time for the events to propagate to File. + // This is to avoid a race after the source processes all events, at that point two things + // can happen in any order, we reaching `terminate_abruptly` and events being written + // to file. We need for the events to be written to the file before `terminate_abruptly`. + // We can't know when exactly all of the events have been written, so we have to guess. + // But it should be shortly after source processing all of the events. std::thread::sleep(std::time::Duration::from_secs(1)); // Simulate a crash. terminate_abruptly(rt, topology); @@ -162,8 +171,17 @@ fn test_max_size() { let _ = rt.block_on(send).unwrap(); // We need to wait for at least the source to process events. - wait_for(|| source_event_counter.load(Ordering::Acquire) == num_events); - // Now we give it some time for the events to propagate to Disk. + // This is to avoid a race after we send all events, at that point two things + // can happen in any order, we reaching `terminate_abruptly` and source processing + // all of the events. We need for the source to process events before `terminate_abruptly` + // so we wait for that here. + wait_for_atomic_usize(source_event_counter, |x| x == num_events); + // Now we give it some time for the events to propagate to File. + // This is to avoid a race after the source processes all events, at that point two things + // can happen in any order, we reaching `terminate_abruptly` and events being written + // to file. We need for the events to be written to the file before `terminate_abruptly`. + // We can't know when exactly all of the events have been written, so we have to guess. + // But it should be shortly after source processing all of the events. std::thread::sleep(std::time::Duration::from_secs(1)); // Simulate a crash. terminate_abruptly(rt, topology); @@ -298,8 +316,17 @@ fn test_reclaim_disk_space() { let _ = rt.block_on(send).unwrap(); // We need to wait for at least the source to process events. - wait_for(|| source_event_counter.load(Ordering::Acquire) == num_events); - // Now we give it some time for the events to propagate to Disk. + // This is to avoid a race after we send all events, at that point two things + // can happen in any order, we reaching `terminate_abruptly` and source processing + // all of the events. We need for the source to process events before `terminate_abruptly` + // so we wait for that here. + wait_for_atomic_usize(source_event_counter, |x| x == num_events); + // Now we give it some time for the events to propagate to File. + // This is to avoid a race after the source processes all events, at that point two things + // can happen in any order, we reaching `terminate_abruptly` and events being written + // to file. We need for the events to be written to the file before `terminate_abruptly`. + // We can't know when exactly all of the events have been written, so we have to guess. + // But it should be shortly after source processing all of the events. std::thread::sleep(std::time::Duration::from_secs(1)); // Simulate a crash. terminate_abruptly(rt, topology); From d97018e66cce0c2f6675c0cb529d4d89fb899d08 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Wed, 24 Jun 2020 16:44:06 +0200 Subject: [PATCH 4/4] Remove unneded Signed-off-by: Kruno Tomola Fabro --- tests/buffering.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/buffering.rs b/tests/buffering.rs index d444c1b3ed428..b1cac20ee4fcf 100644 --- a/tests/buffering.rs +++ b/tests/buffering.rs @@ -4,11 +4,10 @@ use futures01::{Future, Sink}; use prost::Message; -use std::sync::atomic::Ordering; use tempfile::tempdir; use tracing::trace; use vector::event; -use vector::test_util::{self, next_addr, wait_for, wait_for_atomic_usize}; +use vector::test_util::{self, next_addr, wait_for_atomic_usize}; use vector::topology::{self, config}; use vector::{buffers::BufferConfig, runtime, sinks};