Skip to content

Commit e476e12

Browse files
authored
feat: Refactor 'event.get()' to use path types (vectordotdev#18160)
* feat: Refactor 'get()' to use path types * vdev fmt
1 parent 3b53bcd commit e476e12

File tree

13 files changed

+124
-59
lines changed

13 files changed

+124
-59
lines changed

lib/vector-core/src/event/discriminant.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,13 @@ impl Discriminant {
2727
pub fn from_log_event(event: &LogEvent, discriminant_fields: &[impl AsRef<str>]) -> Self {
2828
let values: Vec<Option<Value>> = discriminant_fields
2929
.iter()
30-
.map(|discriminant_field| event.get(discriminant_field.as_ref()).cloned())
30+
.map(|discriminant_field| {
31+
event
32+
.parse_path_and_get_value(discriminant_field.as_ref())
33+
.ok()
34+
.flatten()
35+
.cloned()
36+
})
3137
.collect();
3238
Self { values }
3339
}

lib/vector-core/src/event/log_event.rs

+24-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use vector_common::{
2020
request_metadata::GetEventCountTags,
2121
EventDataEq,
2222
};
23-
use vrl::path::OwnedTargetPath;
23+
use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError};
2424

2525
use super::{
2626
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
@@ -295,6 +295,16 @@ impl LogEvent {
295295
self.metadata.add_finalizer(finalizer);
296296
}
297297

298+
/// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
299+
/// # Errors
300+
/// Will return an error if path parsing failed.
301+
pub fn parse_path_and_get_value(
302+
&self,
303+
path: impl AsRef<str>,
304+
) -> Result<Option<&Value>, PathParseError> {
305+
parse_target_path(path.as_ref()).map(|path| self.get(&path))
306+
}
307+
298308
#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
299309
pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
300310
match key.prefix() {
@@ -439,11 +449,14 @@ impl LogEvent {
439449
let Some(incoming_val) = incoming.remove(field.as_ref()) else {
440450
continue
441451
};
442-
match self.get_mut(field.as_ref()) {
443-
None => {
444-
self.insert(field.as_ref(), incoming_val);
452+
453+
if let Ok(path) = parse_target_path(field.as_ref()) {
454+
match self.get_mut(&path) {
455+
None => {
456+
self.insert(&path, incoming_val);
457+
}
458+
Some(current_val) => current_val.merge(incoming_val),
445459
}
446-
Some(current_val) => current_val.merge(incoming_val),
447460
}
448461
}
449462
self.metadata.merge(incoming.metadata);
@@ -642,7 +655,9 @@ where
642655
type Output = Value;
643656

644657
fn index(&self, key: T) -> &Value {
645-
self.get(key.as_ref())
658+
self.parse_path_and_get_value(key.as_ref())
659+
.ok()
660+
.flatten()
646661
.unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
647662
}
648663
}
@@ -654,7 +669,9 @@ where
654669
{
655670
fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
656671
for (k, v) in iter {
657-
self.insert(k.as_ref(), v.into());
672+
if let Ok(path) = parse_target_path(k.as_ref()) {
673+
self.insert(&path, v.into());
674+
}
658675
}
659676
}
660677
}

lib/vector-core/src/event/trace.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use vector_common::{
77
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
88
EventDataEq,
99
};
10+
use vrl::path::PathParseError;
1011

1112
use super::{
1213
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
@@ -71,13 +72,23 @@ impl TraceEvent {
7172
self.0.as_map().expect("inner value must be a map")
7273
}
7374

75+
/// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
76+
/// # Errors
77+
/// Will return an error if path parsing failed.
78+
pub fn parse_path_and_get_value(
79+
&self,
80+
path: impl AsRef<str>,
81+
) -> Result<Option<&Value>, PathParseError> {
82+
self.0.parse_path_and_get_value(path)
83+
}
84+
7485
#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
7586
pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
7687
self.0.get(key)
7788
}
7889

79-
pub fn get_mut(&mut self, key: impl AsRef<str>) -> Option<&mut Value> {
80-
self.0.get_mut(key.as_ref())
90+
pub fn get_mut<'a>(&mut self, key: impl TargetPath<'a>) -> Option<&mut Value> {
91+
self.0.get_mut(key)
8192
}
8293

8394
pub fn contains(&self, key: impl AsRef<str>) -> bool {

src/api/schema/events/trace.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_graphql::Object;
22
use vector_common::encode_logfmt;
3+
use vrl::event_path;
34

45
use super::EventEncodingType;
56
use crate::{event, topology::TapOutput};
@@ -48,7 +49,7 @@ impl Trace {
4849

4950
/// Get JSON field data on the trace event, by field name
5051
async fn json(&self, field: String) -> Option<String> {
51-
self.event.get(field.as_str()).map(|field| {
52+
self.event.get(event_path!(field.as_str())).map(|field| {
5253
serde_json::to_string(field)
5354
.expect("JSON serialization of log event field failed. Please report.")
5455
})

src/conditions/datadog_search.rs

+23-11
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ impl Filter<LogEvent> for EventFilter {
7575
any_string_match("tags", move |value| value == field)
7676
}
7777
Field::Default(f) | Field::Facet(f) | Field::Reserved(f) => {
78-
Run::boxed(move |log: &LogEvent| log.get(f.as_str()).is_some())
78+
Run::boxed(move |log: &LogEvent| {
79+
log.parse_path_and_get_value(f.as_str())
80+
.ok()
81+
.flatten()
82+
.is_some()
83+
})
7984
}
8085
}
8186
}
@@ -165,8 +170,11 @@ impl Filter<LogEvent> for EventFilter {
165170
match field {
166171
// Facets are compared numerically if the value is numeric, or as strings otherwise.
167172
Field::Facet(f) => {
168-
Run::boxed(
169-
move |log: &LogEvent| match (log.get(f.as_str()), &comparison_value) {
173+
Run::boxed(move |log: &LogEvent| {
174+
match (
175+
log.parse_path_and_get_value(f.as_str()).ok().flatten(),
176+
&comparison_value,
177+
) {
170178
// Integers.
171179
(Some(Value::Integer(lhs)), ComparisonValue::Integer(rhs)) => {
172180
match comparator {
@@ -227,8 +235,8 @@ impl Filter<LogEvent> for EventFilter {
227235
}
228236
}
229237
_ => false,
230-
},
231-
)
238+
}
239+
})
232240
}
233241
// Tag values need extracting by "key:value" to be compared.
234242
Field::Tag(tag) => any_string_match("tags", move |value| match value.split_once(':') {
@@ -266,9 +274,11 @@ where
266274
{
267275
let field = field.into();
268276

269-
Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) {
270-
Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)),
271-
_ => false,
277+
Run::boxed(move |log: &LogEvent| {
278+
match log.parse_path_and_get_value(field.as_str()).ok().flatten() {
279+
Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)),
280+
_ => false,
281+
}
272282
})
273283
}
274284

@@ -281,9 +291,11 @@ where
281291
{
282292
let field = field.into();
283293

284-
Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) {
285-
Some(Value::Array(values)) => func(values),
286-
_ => false,
294+
Run::boxed(move |log: &LogEvent| {
295+
match log.parse_path_and_get_value(field.as_str()).ok().flatten() {
296+
Some(Value::Array(values)) => func(values),
297+
_ => false,
298+
}
287299
})
288300
}
289301

src/sinks/datadog/traces/apm_stats/aggregation.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{collections::BTreeMap, sync::Arc};
22

33
use chrono::Utc;
4+
use vrl::event_path;
45

56
use super::{
67
bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey,
@@ -179,7 +180,7 @@ impl Aggregator {
179180
pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) {
180181
// Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184
181182

182-
let spans = match trace.get("spans") {
183+
let spans = match trace.get(event_path!("spans")) {
183184
Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(),
184185
_ => vec![],
185186
};
@@ -189,16 +190,16 @@ impl Aggregator {
189190
env: partition_key.env.clone().unwrap_or_default(),
190191
hostname: partition_key.hostname.clone().unwrap_or_default(),
191192
version: trace
192-
.get("app_version")
193+
.get(event_path!("app_version"))
193194
.map(|v| v.to_string_lossy().into_owned())
194195
.unwrap_or_default(),
195196
container_id: trace
196-
.get("container_id")
197+
.get(event_path!("container_id"))
197198
.map(|v| v.to_string_lossy().into_owned())
198199
.unwrap_or_default(),
199200
};
200201
let synthetics = trace
201-
.get("origin")
202+
.get(event_path!("origin"))
202203
.map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS))
203204
.unwrap_or(false);
204205

src/sinks/datadog/traces/request_builder.rs

+14-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use prost::Message;
1010
use snafu::Snafu;
1111
use vector_common::request_metadata::RequestMetadata;
1212
use vector_core::event::{EventFinalizers, Finalizable};
13+
use vrl::event_path;
1314

1415
use super::{
1516
apm_stats::{compute_apm_stats, Aggregator},
@@ -283,7 +284,7 @@ impl DatadogTracesEncoder {
283284

284285
fn vector_trace_into_dd_tracer_payload(trace: &TraceEvent) -> dd_proto::TracerPayload {
285286
let tags = trace
286-
.get("tags")
287+
.get(event_path!("tags"))
287288
.and_then(|m| m.as_object())
288289
.map(|m| {
289290
m.iter()
@@ -292,7 +293,7 @@ impl DatadogTracesEncoder {
292293
})
293294
.unwrap_or_default();
294295

295-
let spans = match trace.get("spans") {
296+
let spans = match trace.get(event_path!("spans")) {
296297
Some(Value::Array(v)) => v
297298
.iter()
298299
.filter_map(|s| s.as_object().map(DatadogTracesEncoder::convert_span))
@@ -302,19 +303,19 @@ impl DatadogTracesEncoder {
302303

303304
let chunk = dd_proto::TraceChunk {
304305
priority: trace
305-
.get("priority")
306+
.get(event_path!("priority"))
306307
.and_then(|v| v.as_integer().map(|v| v as i32))
307308
// This should not happen for Datadog originated traces, but in case this field is not populated
308309
// we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
309310
// which is what the Datadog trace-agent is doing for OTLP originated traces, as per
310311
// https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
311312
.unwrap_or(1i32),
312313
origin: trace
313-
.get("origin")
314+
.get(event_path!("origin"))
314315
.map(|v| v.to_string_lossy().into_owned())
315316
.unwrap_or_default(),
316317
dropped_trace: trace
317-
.get("dropped")
318+
.get(event_path!("dropped"))
318319
.and_then(|v| v.as_boolean())
319320
.unwrap_or(false),
320321
spans,
@@ -323,37 +324,37 @@ impl DatadogTracesEncoder {
323324

324325
dd_proto::TracerPayload {
325326
container_id: trace
326-
.get("container_id")
327+
.get(event_path!("container_id"))
327328
.map(|v| v.to_string_lossy().into_owned())
328329
.unwrap_or_default(),
329330
language_name: trace
330-
.get("language_name")
331+
.get(event_path!("language_name"))
331332
.map(|v| v.to_string_lossy().into_owned())
332333
.unwrap_or_default(),
333334
language_version: trace
334-
.get("language_version")
335+
.get(event_path!("language_version"))
335336
.map(|v| v.to_string_lossy().into_owned())
336337
.unwrap_or_default(),
337338
tracer_version: trace
338-
.get("tracer_version")
339+
.get(event_path!("tracer_version"))
339340
.map(|v| v.to_string_lossy().into_owned())
340341
.unwrap_or_default(),
341342
runtime_id: trace
342-
.get("runtime_id")
343+
.get(event_path!("runtime_id"))
343344
.map(|v| v.to_string_lossy().into_owned())
344345
.unwrap_or_default(),
345346
chunks: vec![chunk],
346347
tags,
347348
env: trace
348-
.get("env")
349+
.get(event_path!("env"))
349350
.map(|v| v.to_string_lossy().into_owned())
350351
.unwrap_or_default(),
351352
hostname: trace
352-
.get("hostname")
353+
.get(event_path!("hostname"))
353354
.map(|v| v.to_string_lossy().into_owned())
354355
.unwrap_or_default(),
355356
app_version: trace
356-
.get("app_version")
357+
.get(event_path!("app_version"))
357358
.map(|v| v.to_string_lossy().into_owned())
358359
.unwrap_or_default(),
359360
}

src/sinks/datadog/traces/sink.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use futures_util::{
77
};
88
use tokio::sync::oneshot::{channel, Sender};
99
use tower::Service;
10+
use vrl::event_path;
1011
use vrl::path::PathPrefix;
1112

1213
use vector_core::{
@@ -54,19 +55,21 @@ impl Partitioner for EventPartitioner {
5455
}
5556
Event::Trace(t) => PartitionKey {
5657
api_key: item.metadata().datadog_api_key(),
57-
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
58+
env: t
59+
.get(event_path!("env"))
60+
.map(|s| s.to_string_lossy().into_owned()),
5861
hostname: log_schema().host_key().and_then(|key| {
5962
t.get((PathPrefix::Event, key))
6063
.map(|s| s.to_string_lossy().into_owned())
6164
}),
6265
agent_version: t
63-
.get("agent_version")
66+
.get(event_path!("agent_version"))
6467
.map(|s| s.to_string_lossy().into_owned()),
6568
target_tps: t
66-
.get("target_tps")
69+
.get(event_path!("target_tps"))
6770
.and_then(|tps| tps.as_integer().map(Into::into)),
6871
error_tps: t
69-
.get("error_tps")
72+
.get(event_path!("error_tps"))
7073
.and_then(|tps| tps.as_integer().map(Into::into)),
7174
},
7275
}

src/sinks/elasticsearch/config.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,9 @@ impl DataStreamConfig {
443443
let (dtype, dataset, namespace) = if !self.auto_routing {
444444
(self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
445445
} else {
446-
let data_stream = log.get("data_stream").and_then(|ds| ds.as_object());
446+
let data_stream = log
447+
.get(event_path!("data_stream"))
448+
.and_then(|ds| ds.as_object());
447449
let dtype = data_stream
448450
.and_then(|ds| ds.get("type"))
449451
.map(|value| value.to_string_lossy().into_owned())

0 commit comments

Comments
 (0)