Skip to content

Commit 0bf6abd

Browse files
chore(observability): count byte_size after transforming event (#17941)
Before this the `component_sent_event_bytes_total` was emitting the json byte size of the event including any fields dropped by `only_fields` and `except_field` settings. This changes it so the the count is made after transforming the event. A complication arose whereby if the service field was dropped we would no longer have access to this value to emit the `service` tag with the metrics. This also adds a `dropped_fields` field to the event metadata where this value can be stored and accessed. --------- Signed-off-by: Stephen Wakely <[email protected]>
1 parent 7050b7e commit 0bf6abd

File tree

29 files changed

+427
-155
lines changed

29 files changed

+427
-155
lines changed

lib/vector-core/src/event/log_event.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,19 @@ impl LogEvent {
292292
}
293293
}
294294

295+
/// Retrieves the value of a field based on it's meaning.
296+
/// This will first check if the value has previously been dropped. It is worth being
297+
/// aware that if the field has been dropped and then some how readded, we still fetch
298+
/// the dropped value here.
295299
pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
296-
self.metadata()
297-
.schema_definition()
298-
.meaning_path(meaning.as_ref())
299-
.and_then(|path| self.get(path))
300+
if let Some(dropped) = self.metadata().dropped_field(&meaning) {
301+
Some(dropped)
302+
} else {
303+
self.metadata()
304+
.schema_definition()
305+
.meaning_path(meaning.as_ref())
306+
.and_then(|path| self.get(path))
307+
}
300308
}
301309

302310
// TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed.

lib/vector-core/src/event/metadata.rs

+22
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ pub struct EventMetadata {
4646
/// TODO(Jean): must not skip serialization to track schemas across restarts.
4747
#[serde(default = "default_schema_definition", skip)]
4848
schema_definition: Arc<schema::Definition>,
49+
50+
/// A store of values that may be dropped during the encoding process but may be needed
51+
/// later on. The map is indexed by meaning.
52+
/// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
53+
/// we need to ensure it is still available later on for emitting metrics tagged by the service.
54+
/// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
55+
/// we have to use `String`.
56+
dropped_fields: BTreeMap<String, Value>,
4957
}
5058

5159
fn default_metadata_value() -> Value {
@@ -123,6 +131,19 @@ impl EventMetadata {
123131
pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
124132
self.secrets.insert(SPLUNK_HEC_TOKEN, secret);
125133
}
134+
135+
/// Adds the value to the dropped fields list.
136+
/// There is currently no way to remove a field from this list, so if a field is dropped
137+
/// and then the field is re-added with a new value - the dropped value will still be
138+
/// retrieved.
139+
pub fn add_dropped_field(&mut self, meaning: String, value: Value) {
140+
self.dropped_fields.insert(meaning, value);
141+
}
142+
143+
/// Fetches the dropped field by meaning.
144+
pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
145+
self.dropped_fields.get(meaning.as_ref())
146+
}
126147
}
127148

128149
impl Default for EventMetadata {
@@ -134,6 +155,7 @@ impl Default for EventMetadata {
134155
schema_definition: default_schema_definition(),
135156
source_id: None,
136157
upstream_id: None,
158+
dropped_fields: BTreeMap::new(),
137159
}
138160
}
139161
}

lib/vector-core/src/schema/meaning.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//! Constants for commonly used semantic meanings.
2+
3+
/// The service typically represents the application that generated the event.
4+
pub const SERVICE: &str = "service";
5+
6+
/// The main text message of the event.
7+
pub const MESSAGE: &str = "message";
8+
9+
/// The main timestamp of the event.
10+
pub const TIMESTAMP: &str = "timestamp";
11+
12+
/// The hostname of the machine where the event was generated.
13+
pub const HOST: &str = "host";
14+
15+
pub const SOURCE: &str = "source";
16+
pub const SEVERITY: &str = "severity";
17+
pub const TRACE_ID: &str = "trace_id";

lib/vector-core/src/schema/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod definition;
2+
pub mod meaning;
23
mod requirement;
34

45
pub use definition::Definition;

src/codecs/encoding/transformer.rs

+123-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use lookup::{
1212
use serde::{Deserialize, Deserializer};
1313
use vector_config::configurable_component;
1414
use vector_core::event::{LogEvent, MaybeAsLogMut};
15+
use vector_core::schema::meaning;
1516
use vrl::value::Value;
1617

1718
use crate::{event::Event, serde::skip_serializing_if_default};
@@ -128,20 +129,52 @@ impl Transformer {
128129

129130
fn apply_only_fields(&self, log: &mut LogEvent) {
130131
if let Some(only_fields) = self.only_fields.as_ref() {
131-
let old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
132+
let mut old_value = std::mem::replace(log.value_mut(), Value::Object(BTreeMap::new()));
132133

133134
for field in only_fields {
134-
if let Some(value) = old_value.get(field) {
135-
log.insert((PathPrefix::Event, field), value.clone());
135+
if let Some(value) = old_value.remove(field, true) {
136+
log.insert((PathPrefix::Event, field), value);
137+
}
138+
}
139+
140+
// We may need the service field to apply tags to emitted metrics after the log message has been pruned. If there
141+
// is a service meaning, we move this value to `dropped_fields` in the metadata.
142+
// If the field is still in the new log message after pruning it will have been removed from `old_value` above.
143+
let service_path = log
144+
.metadata()
145+
.schema_definition()
146+
.meaning_path(meaning::SERVICE);
147+
if let Some(service_path) = service_path {
148+
let mut new_log = LogEvent::from(old_value);
149+
if let Some(service) = new_log.remove(service_path) {
150+
log.metadata_mut()
151+
.add_dropped_field(meaning::SERVICE.to_string(), service);
136152
}
137153
}
138154
}
139155
}
140156

141157
fn apply_except_fields(&self, log: &mut LogEvent) {
158+
use lookup::path::TargetPath;
159+
142160
if let Some(except_fields) = self.except_fields.as_ref() {
161+
let service_path = log
162+
.metadata()
163+
.schema_definition()
164+
.meaning_path(meaning::SERVICE)
165+
.map(|path| path.value_path().to_string());
166+
143167
for field in except_fields {
144-
log.remove(field.as_str());
168+
let value = log.remove(field.as_str());
169+
170+
// If we are removing the service field we need to store this in a `dropped_fields` list as we may need to
171+
// refer to this later when emitting metrics.
172+
if let Some(v) = value {
173+
if matches!(service_path.as_ref(), Some(path) if path == field) {
174+
log.metadata_mut()
175+
.add_dropped_field(meaning::SERVICE.to_string(), v);
176+
}
177+
}
145178
}
146179
}
147180
}
@@ -213,10 +246,15 @@ pub enum TimestampFormat {
213246
#[cfg(test)]
214247
mod tests {
215248
use indoc::indoc;
216-
use vector_core::config::log_schema;
249+
use lookup::path::parse_target_path;
250+
use vector_common::btreemap;
251+
use vector_core::config::{log_schema, LogNamespace};
252+
use vrl::value::Kind;
253+
254+
use crate::config::schema;
217255

218256
use super::*;
219-
use std::collections::BTreeMap;
257+
use std::{collections::BTreeMap, sync::Arc};
220258

221259
#[test]
222260
fn serialize() {
@@ -374,4 +412,83 @@ mod tests {
374412
"#});
375413
assert!(config.is_err())
376414
}
415+
416+
#[test]
417+
fn only_fields_with_service() {
418+
let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap();
419+
let mut log = LogEvent::default();
420+
{
421+
log.insert("message", 1);
422+
log.insert("thing.service", "carrot");
423+
}
424+
425+
let schema = schema::Definition::new_with_default_metadata(
426+
Kind::object(btreemap! {
427+
"thing" => Kind::object(btreemap! {
428+
"service" => Kind::bytes(),
429+
})
430+
}),
431+
[LogNamespace::Vector],
432+
);
433+
434+
let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
435+
436+
let mut event = Event::from(log);
437+
438+
event
439+
.metadata_mut()
440+
.set_schema_definition(&Arc::new(schema));
441+
442+
transformer.transform(&mut event);
443+
assert!(event.as_mut_log().contains("message"));
444+
445+
// Event no longer contains the service field.
446+
assert!(!event.as_mut_log().contains("thing.service"));
447+
448+
// But we can still get the service by meaning.
449+
assert_eq!(
450+
&Value::from("carrot"),
451+
event.as_log().get_by_meaning("service").unwrap()
452+
);
453+
}
454+
455+
#[test]
456+
fn except_fields_with_service() {
457+
let transformer: Transformer =
458+
toml::from_str(r#"except_fields = ["thing.service"]"#).unwrap();
459+
let mut log = LogEvent::default();
460+
{
461+
log.insert("message", 1);
462+
log.insert("thing.service", "carrot");
463+
}
464+
465+
let schema = schema::Definition::new_with_default_metadata(
466+
Kind::object(btreemap! {
467+
"thing" => Kind::object(btreemap! {
468+
"service" => Kind::bytes(),
469+
})
470+
}),
471+
[LogNamespace::Vector],
472+
);
473+
474+
let schema = schema.with_meaning(parse_target_path("thing.service").unwrap(), "service");
475+
476+
let mut event = Event::from(log);
477+
478+
event
479+
.metadata_mut()
480+
.set_schema_definition(&Arc::new(schema));
481+
482+
transformer.transform(&mut event);
483+
assert!(event.as_mut_log().contains("message"));
484+
485+
// Event no longer contains the service field.
486+
assert!(!event.as_mut_log().contains("thing.service"));
487+
488+
// But we can still get the service by meaning.
489+
assert_eq!(
490+
&Value::from("carrot"),
491+
event.as_log().get_by_meaning("service").unwrap()
492+
);
493+
}
377494
}

src/sinks/amqp/encoder.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::sinks::prelude::*;
33
use bytes::BytesMut;
44
use std::io;
55
use tokio_util::codec::Encoder as _;
6+
use vector_core::config::telemetry;
67

78
#[derive(Clone, Debug)]
89
pub(super) struct AmqpEncoder {
@@ -11,9 +12,17 @@ pub(super) struct AmqpEncoder {
1112
}
1213

1314
impl encoding::Encoder<Event> for AmqpEncoder {
14-
fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result<usize> {
15+
fn encode_input(
16+
&self,
17+
mut input: Event,
18+
writer: &mut dyn io::Write,
19+
) -> io::Result<(usize, GroupedCountByteSize)> {
1520
let mut body = BytesMut::new();
1621
self.transformer.transform(&mut input);
22+
23+
let mut byte_size = telemetry().create_request_count_byte_size();
24+
byte_size.add_event(&input, input.estimated_json_encoded_size_of());
25+
1726
let mut encoder = self.encoder.clone();
1827
encoder
1928
.encode(input, &mut body)
@@ -22,6 +31,6 @@ impl encoding::Encoder<Event> for AmqpEncoder {
2231
let body = body.freeze();
2332
write_all(writer, 1, body.as_ref())?;
2433

25-
Ok(body.len())
34+
Ok((body.len(), byte_size))
2635
}
2736
}

src/sinks/azure_blob/test.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use codecs::{
44
encoding::{Framer, FramingConfig},
55
NewlineDelimitedEncoder, TextSerializerConfig,
66
};
7-
use vector_core::partition::Partitioner;
7+
use vector_common::request_metadata::GroupedCountByteSize;
8+
use vector_core::{partition::Partitioner, EstimatedJsonEncodedSizeOf};
89

910
use super::config::AzureBlobSinkConfig;
1011
use super::request_builder::AzureBlobRequestOptions;
@@ -68,10 +69,13 @@ fn azure_blob_build_request_without_compression() {
6869
compression,
6970
};
7071

72+
let mut byte_size = GroupedCountByteSize::new_untagged();
73+
byte_size.add_event(&log, log.estimated_json_encoded_size_of());
74+
7175
let (metadata, request_metadata_builder, _events) =
7276
request_options.split_input((key, vec![log]));
7377

74-
let payload = EncodeResult::uncompressed(Bytes::new());
78+
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
7579
let request_metadata = request_metadata_builder.build(&payload);
7680
let request = request_options.build_request(metadata, request_metadata, payload);
7781

@@ -112,10 +116,14 @@ fn azure_blob_build_request_with_compression() {
112116
),
113117
compression,
114118
};
119+
120+
let mut byte_size = GroupedCountByteSize::new_untagged();
121+
byte_size.add_event(&log, log.estimated_json_encoded_size_of());
122+
115123
let (metadata, request_metadata_builder, _events) =
116124
request_options.split_input((key, vec![log]));
117125

118-
let payload = EncodeResult::uncompressed(Bytes::new());
126+
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
119127
let request_metadata = request_metadata_builder.build(&payload);
120128
let request = request_options.build_request(metadata, request_metadata, payload);
121129

@@ -157,10 +165,13 @@ fn azure_blob_build_request_with_time_format() {
157165
compression,
158166
};
159167

168+
let mut byte_size = GroupedCountByteSize::new_untagged();
169+
byte_size.add_event(&log, log.estimated_json_encoded_size_of());
170+
160171
let (metadata, request_metadata_builder, _events) =
161172
request_options.split_input((key, vec![log]));
162173

163-
let payload = EncodeResult::uncompressed(Bytes::new());
174+
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
164175
let request_metadata = request_metadata_builder.build(&payload);
165176
let request = request_options.build_request(metadata, request_metadata, payload);
166177

@@ -205,10 +216,13 @@ fn azure_blob_build_request_with_uuid() {
205216
compression,
206217
};
207218

219+
let mut byte_size = GroupedCountByteSize::new_untagged();
220+
byte_size.add_event(&log, log.estimated_json_encoded_size_of());
221+
208222
let (metadata, request_metadata_builder, _events) =
209223
request_options.split_input((key, vec![log]));
210224

211-
let payload = EncodeResult::uncompressed(Bytes::new());
225+
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
212226
let request_metadata = request_metadata_builder.build(&payload);
213227
let request = request_options.build_request(metadata, request_metadata, payload);
214228

src/sinks/datadog/logs/config.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{convert::TryFrom, sync::Arc};
33
use indoc::indoc;
44
use tower::ServiceBuilder;
55
use vector_config::configurable_component;
6-
use vector_core::config::proxy::ProxyConfig;
6+
use vector_core::{config::proxy::ProxyConfig, schema::meaning};
77
use vrl::value::Kind;
88

99
use super::{service::LogApiRetry, sink::LogSinkBuilder};
@@ -176,13 +176,13 @@ impl SinkConfig for DatadogLogsConfig {
176176

177177
fn input(&self) -> Input {
178178
let requirement = schema::Requirement::empty()
179-
.required_meaning("message", Kind::bytes())
180-
.required_meaning("timestamp", Kind::timestamp())
181-
.optional_meaning("host", Kind::bytes())
182-
.optional_meaning("source", Kind::bytes())
183-
.optional_meaning("severity", Kind::bytes())
184-
.optional_meaning("service", Kind::bytes())
185-
.optional_meaning("trace_id", Kind::bytes());
179+
.required_meaning(meaning::MESSAGE, Kind::bytes())
180+
.required_meaning(meaning::TIMESTAMP, Kind::timestamp())
181+
.optional_meaning(meaning::HOST, Kind::bytes())
182+
.optional_meaning(meaning::SOURCE, Kind::bytes())
183+
.optional_meaning(meaning::SEVERITY, Kind::bytes())
184+
.optional_meaning(meaning::SERVICE, Kind::bytes())
185+
.optional_meaning(meaning::TRACE_ID, Kind::bytes());
186186

187187
Input::log().with_schema_requirement(requirement)
188188
}

0 commit comments

Comments
 (0)