Skip to content

Commit 4915b42

Browse files
chore(observability): add tests to sinks for Data Volume tags (vectordotdev#17853)
* Test loki sink for data volume tags Signed-off-by: Stephen Wakely <[email protected]> * Add new relic test Signed-off-by: Stephen Wakely <[email protected]> * Add elastic search test Signed-off-by: Stephen Wakely <[email protected]> * Add splunk logs tests Signed-off-by: Stephen Wakely <[email protected]> * Add splunk metrics tests Signed-off-by: Stephen Wakely <[email protected]> * Add vector test Signed-off-by: Stephen Wakely <[email protected]> * Fix Vector test Signed-off-by: Stephen Wakely <[email protected]> * Remove RequestMetadataBuilder::new Signed-off-by: Stephen Wakely <[email protected]> * Feedback from Spencer and Kyle Signed-off-by: Stephen Wakely <[email protected]> * Typo crept in Signed-off-by: Stephen Wakely <[email protected]> * Move function up a little Signed-off-by: Stephen Wakely <[email protected]> * Fixed datadog agent test Signed-off-by: Stephen Wakely <[email protected]> --------- Signed-off-by: Stephen Wakely <[email protected]>
1 parent 9458b6c commit 4915b42

File tree

14 files changed

+499
-121
lines changed

14 files changed

+499
-121
lines changed

lib/vector-common/src/request_metadata.rs

+82-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
use std::collections::HashMap;
2-
use std::ops::Add;
1+
use std::{
2+
collections::HashMap,
3+
ops::{Add, AddAssign},
4+
};
35

46
use crate::{
57
internal_event::{
@@ -119,6 +121,22 @@ impl GroupedCountByteSize {
119121
}
120122
}
121123
}
124+
125+
/// Returns `true` if we are the `Tagged` variant - keeping track of the byte sizes
126+
/// grouped by their relevant tags.
127+
#[must_use]
128+
pub fn is_tagged(&self) -> bool {
129+
match self {
130+
GroupedCountByteSize::Tagged { .. } => true,
131+
GroupedCountByteSize::Untagged { .. } => false,
132+
}
133+
}
134+
135+
/// Returns `true` if we are the `Untagged` variant - keeping a single count for all events.
136+
#[must_use]
137+
pub fn is_untagged(&self) -> bool {
138+
!self.is_tagged()
139+
}
122140
}
123141

124142
impl From<CountByteSize> for GroupedCountByteSize {
@@ -127,26 +145,81 @@ impl From<CountByteSize> for GroupedCountByteSize {
127145
}
128146
}
129147

148+
impl AddAssign for GroupedCountByteSize {
149+
fn add_assign(&mut self, mut rhs: Self) {
150+
if self.is_untagged() && rhs.is_tagged() {
151+
// First handle the case where we are untagged and assigning to a tagged value.
152+
// We need to change `self` and so need to ensure our match doesn't take ownership of the object.
153+
*self = match (&self, &mut rhs) {
154+
(Self::Untagged { size }, Self::Tagged { sizes }) => {
155+
let mut sizes = std::mem::take(sizes);
156+
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
157+
Some(empty_size) => *empty_size += *size,
158+
None => {
159+
sizes.insert(TaggedEventsSent::new_empty(), *size);
160+
}
161+
}
162+
163+
Self::Tagged { sizes }
164+
}
165+
_ => {
166+
unreachable!()
167+
}
168+
};
169+
170+
return;
171+
}
172+
173+
// For these cases, we know we won't have to change `self` so the match can take ownership.
174+
match (self, rhs) {
175+
(Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => {
176+
for (key, value) in rhs {
177+
match lhs.get_mut(&key) {
178+
Some(size) => *size += value,
179+
None => {
180+
lhs.insert(key.clone(), value);
181+
}
182+
}
183+
}
184+
}
185+
186+
(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
187+
*lhs = *lhs + rhs;
188+
}
189+
190+
(Self::Tagged { ref mut sizes }, Self::Untagged { size }) => {
191+
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
192+
Some(empty_size) => *empty_size += size,
193+
None => {
194+
sizes.insert(TaggedEventsSent::new_empty(), size);
195+
}
196+
}
197+
}
198+
(Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(),
199+
};
200+
}
201+
}
202+
130203
impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {
131204
type Output = GroupedCountByteSize;
132205

133206
fn add(self, other: &'a Self::Output) -> Self::Output {
134207
match (self, other) {
135-
(Self::Tagged { sizes: mut us }, Self::Tagged { sizes: them }) => {
136-
for (key, value) in them {
137-
match us.get_mut(key) {
208+
(Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => {
209+
for (key, value) in rhs {
210+
match lhs.get_mut(key) {
138211
Some(size) => *size += *value,
139212
None => {
140-
us.insert(key.clone(), *value);
213+
lhs.insert(key.clone(), *value);
141214
}
142215
}
143216
}
144217

145-
Self::Tagged { sizes: us }
218+
Self::Tagged { sizes: lhs }
146219
}
147220

148-
(Self::Untagged { size: us }, Self::Untagged { size: them }) => {
149-
Self::Untagged { size: us + *them }
221+
(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
222+
Self::Untagged { size: lhs + *rhs }
150223
}
151224

152225
// The following two scenarios shouldn't really occur in practice, but are provided for completeness.

src/sinks/datadog/metrics/encoder.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ impl DatadogMetricsEncoder {
333333
self.state.written += n;
334334

335335
let raw_bytes_written = self.state.written;
336+
let byte_size = self.state.byte_size.clone();
337+
336338
// Consume the encoder state so we can do our final checks and return the necessary data.
337339
let state = self.reset_state();
338340
let payload = state
@@ -357,7 +359,7 @@ impl DatadogMetricsEncoder {
357359
if recommended_splits == 1 {
358360
// "One" split means no splits needed: our payload didn't exceed either of the limits.
359361
Ok((
360-
EncodeResult::compressed(payload, raw_bytes_written, self.state.byte_size.clone()),
362+
EncodeResult::compressed(payload, raw_bytes_written, byte_size),
361363
processed,
362364
))
363365
} else {

src/sinks/datadog/metrics/integration_tests.rs

+51-29
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@ use futures::{channel::mpsc::Receiver, stream, StreamExt};
44
use hyper::StatusCode;
55
use indoc::indoc;
66
use rand::{thread_rng, Rng};
7-
use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue};
7+
use vector_core::{
8+
config::{init_telemetry, Tags, Telemetry},
9+
event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue},
10+
};
811

912
use super::DatadogMetricsConfig;
1013
use crate::{
1114
config::SinkConfig,
1215
sinks::util::test::{build_test_server_status, load_sink},
1316
test_util::{
14-
components::{assert_sink_compliance, SINK_TAGS},
17+
components::{
18+
assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS,
19+
SINK_TAGS,
20+
},
1521
map_event_batch_stream, next_addr,
1622
},
1723
};
@@ -168,35 +174,51 @@ async fn smoke() {
168174
}
169175
}
170176

171-
#[tokio::test]
172-
async fn real_endpoint() {
173-
assert_sink_compliance(&SINK_TAGS, async {
174-
let config = indoc! {r#"
177+
async fn run_sink() {
178+
let config = indoc! {r#"
175179
default_api_key = "${TEST_DATADOG_API_KEY}"
176180
default_namespace = "fake.test.integration"
177181
"#};
178-
let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap();
179-
assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required");
180-
let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key);
181-
let (config, cx) = load_sink::<DatadogMetricsConfig>(config.as_str()).unwrap();
182-
183-
let (sink, _) = config.build(cx).await.unwrap();
184-
let (batch, receiver) = BatchNotifier::new_with_receiver();
185-
let events: Vec<_> = (0..10)
186-
.map(|index| {
187-
Event::Metric(Metric::new(
188-
"counter",
189-
MetricKind::Absolute,
190-
MetricValue::Counter {
191-
value: index as f64,
192-
},
193-
))
194-
})
195-
.collect();
196-
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));
182+
let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap();
183+
assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required");
184+
let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key);
185+
let (config, cx) = load_sink::<DatadogMetricsConfig>(config.as_str()).unwrap();
186+
187+
let (sink, _) = config.build(cx).await.unwrap();
188+
let (batch, receiver) = BatchNotifier::new_with_receiver();
189+
let events: Vec<_> = (0..10)
190+
.map(|index| {
191+
Event::Metric(Metric::new(
192+
"counter",
193+
MetricKind::Absolute,
194+
MetricValue::Counter {
195+
value: index as f64,
196+
},
197+
))
198+
})
199+
.collect();
200+
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));
201+
202+
sink.run(stream).await.unwrap();
203+
assert_eq!(receiver.await, BatchStatus::Delivered);
204+
}
197205

198-
sink.run(stream).await.unwrap();
199-
assert_eq!(receiver.await, BatchStatus::Delivered);
200-
})
201-
.await;
206+
#[tokio::test]
207+
async fn real_endpoint() {
208+
assert_sink_compliance(&SINK_TAGS, async { run_sink().await }).await;
209+
}
210+
211+
#[tokio::test]
212+
async fn data_volume_tags() {
213+
init_telemetry(
214+
Telemetry {
215+
tags: Tags {
216+
emit_service: true,
217+
emit_source: true,
218+
},
219+
},
220+
true,
221+
);
222+
223+
assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async { run_sink().await }).await;
202224
}

src/sinks/datadog/traces/request_builder.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ use bytes::Bytes;
99
use prost::Message;
1010
use snafu::Snafu;
1111
use vector_common::request_metadata::RequestMetadata;
12-
use vector_core::{
13-
event::{EventFinalizers, Finalizable},
14-
EstimatedJsonEncodedSizeOf,
15-
};
12+
use vector_core::event::{EventFinalizers, Finalizable};
1613

1714
use super::{
1815
apm_stats::{compute_apm_stats, Aggregator},
@@ -125,7 +122,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
125122
.for_each(|r| match r {
126123
Ok((payload, mut processed)) => {
127124
let uncompressed_size = payload.len();
128-
let json_size = processed.estimated_json_encoded_size_of();
129125
let metadata = DDTracesMetadata {
130126
api_key: key
131127
.api_key
@@ -137,14 +133,14 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
137133
content_type: "application/x-protobuf".to_string(),
138134
};
139135

136+
// build RequestMetadata
137+
let builder = RequestMetadataBuilder::from_events(&processed);
138+
140139
let mut compressor = Compressor::from(self.compression);
141140
match compressor.write_all(&payload) {
142141
Ok(()) => {
143142
let bytes = compressor.into_inner().freeze();
144143

145-
// build RequestMetadata
146-
let builder =
147-
RequestMetadataBuilder::new(n, uncompressed_size, json_size);
148144
let bytes_len = NonZeroUsize::new(bytes.len())
149145
.expect("payload should never be zero length");
150146
let request_metadata = builder.with_request_size(bytes_len);

0 commit comments

Comments
 (0)