Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor 'event.get()' to use path types #18160

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/vector-core/src/event/discriminant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ impl Discriminant {
pub fn from_log_event(event: &LogEvent, discriminant_fields: &[impl AsRef<str>]) -> Self {
let values: Vec<Option<Value>> = discriminant_fields
.iter()
.map(|discriminant_field| event.get(discriminant_field.as_ref()).cloned())
.map(|discriminant_field| {
event
.parse_path_and_get_value(discriminant_field.as_ref())
.ok()
.flatten()
.cloned()
})
.collect();
Self { values }
}
Expand Down
31 changes: 24 additions & 7 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use vector_common::{
request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::OwnedTargetPath;
use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError};

use super::{
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
Expand Down Expand Up @@ -295,6 +295,16 @@ impl LogEvent {
self.metadata.add_finalizer(finalizer);
}

/// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
/// # Errors
/// Will return an error if path parsing failed.
pub fn parse_path_and_get_value(
&self,
path: impl AsRef<str>,
) -> Result<Option<&Value>, PathParseError> {
parse_target_path(path.as_ref()).map(|path| self.get(&path))
}

#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
match key.prefix() {
Expand Down Expand Up @@ -439,11 +449,14 @@ impl LogEvent {
let Some(incoming_val) = incoming.remove(field.as_ref()) else {
continue
};
match self.get_mut(field.as_ref()) {
None => {
self.insert(field.as_ref(), incoming_val);

if let Ok(path) = parse_target_path(field.as_ref()) {
match self.get_mut(&path) {
None => {
self.insert(&path, incoming_val);
}
Some(current_val) => current_val.merge(incoming_val),
}
Some(current_val) => current_val.merge(incoming_val),
}
}
self.metadata.merge(incoming.metadata);
Expand Down Expand Up @@ -642,7 +655,9 @@ where
type Output = Value;

fn index(&self, key: T) -> &Value {
self.get(key.as_ref())
self.parse_path_and_get_value(key.as_ref())
.ok()
.flatten()
.unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
}
}
Expand All @@ -654,7 +669,9 @@ where
{
fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
for (k, v) in iter {
self.insert(k.as_ref(), v.into());
if let Ok(path) = parse_target_path(k.as_ref()) {
self.insert(&path, v.into());
}
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_common::{
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};
use vrl::path::PathParseError;

use super::{
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
Expand Down Expand Up @@ -71,13 +72,23 @@ impl TraceEvent {
self.0.as_map().expect("inner value must be a map")
}

/// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
/// # Errors
/// Will return an error if path parsing failed.
pub fn parse_path_and_get_value(
&self,
path: impl AsRef<str>,
) -> Result<Option<&Value>, PathParseError> {
self.0.parse_path_and_get_value(path)
}

#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
self.0.get(key)
}

pub fn get_mut(&mut self, key: impl AsRef<str>) -> Option<&mut Value> {
self.0.get_mut(key.as_ref())
pub fn get_mut<'a>(&mut self, key: impl TargetPath<'a>) -> Option<&mut Value> {
self.0.get_mut(key)
}

pub fn contains(&self, key: impl AsRef<str>) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion src/api/schema/events/trace.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_graphql::Object;
use vector_common::encode_logfmt;
use vrl::event_path;

use super::EventEncodingType;
use crate::{event, topology::TapOutput};
Expand Down Expand Up @@ -48,7 +49,7 @@ impl Trace {

/// Get JSON field data on the trace event, by field name
async fn json(&self, field: String) -> Option<String> {
self.event.get(field.as_str()).map(|field| {
self.event.get(event_path!(field.as_str())).map(|field| {
serde_json::to_string(field)
.expect("JSON serialization of log event field failed. Please report.")
})
Expand Down
34 changes: 23 additions & 11 deletions src/conditions/datadog_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ impl Filter<LogEvent> for EventFilter {
any_string_match("tags", move |value| value == field)
}
Field::Default(f) | Field::Facet(f) | Field::Reserved(f) => {
Run::boxed(move |log: &LogEvent| log.get(f.as_str()).is_some())
Run::boxed(move |log: &LogEvent| {
log.parse_path_and_get_value(f.as_str())
.ok()
.flatten()
.is_some()
})
}
}
}
Expand Down Expand Up @@ -165,8 +170,11 @@ impl Filter<LogEvent> for EventFilter {
match field {
// Facets are compared numerically if the value is numeric, or as strings otherwise.
Field::Facet(f) => {
Run::boxed(
move |log: &LogEvent| match (log.get(f.as_str()), &comparison_value) {
Run::boxed(move |log: &LogEvent| {
match (
log.parse_path_and_get_value(f.as_str()).ok().flatten(),
&comparison_value,
) {
// Integers.
(Some(Value::Integer(lhs)), ComparisonValue::Integer(rhs)) => {
match comparator {
Expand Down Expand Up @@ -227,8 +235,8 @@ impl Filter<LogEvent> for EventFilter {
}
}
_ => false,
},
)
}
})
}
// Tag values need extracting by "key:value" to be compared.
Field::Tag(tag) => any_string_match("tags", move |value| match value.split_once(':') {
Expand Down Expand Up @@ -266,9 +274,11 @@ where
{
let field = field.into();

Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) {
Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)),
_ => false,
Run::boxed(move |log: &LogEvent| {
match log.parse_path_and_get_value(field.as_str()).ok().flatten() {
Some(Value::Bytes(v)) => func(String::from_utf8_lossy(v)),
_ => false,
}
})
}

Expand All @@ -281,9 +291,11 @@ where
{
let field = field.into();

Run::boxed(move |log: &LogEvent| match log.get(field.as_str()) {
Some(Value::Array(values)) => func(values),
_ => false,
Run::boxed(move |log: &LogEvent| {
match log.parse_path_and_get_value(field.as_str()).ok().flatten() {
Some(Value::Array(values)) => func(values),
_ => false,
}
})
}

Expand Down
9 changes: 5 additions & 4 deletions src/sinks/datadog/traces/apm_stats/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};

use chrono::Utc;
use vrl::event_path;

use super::{
bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey,
Expand Down Expand Up @@ -179,7 +180,7 @@ impl Aggregator {
pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) {
// Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184

let spans = match trace.get("spans") {
let spans = match trace.get(event_path!("spans")) {
Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(),
_ => vec![],
};
Expand All @@ -189,16 +190,16 @@ impl Aggregator {
env: partition_key.env.clone().unwrap_or_default(),
hostname: partition_key.hostname.clone().unwrap_or_default(),
version: trace
.get("app_version")
.get(event_path!("app_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
container_id: trace
.get("container_id")
.get(event_path!("container_id"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
};
let synthetics = trace
.get("origin")
.get(event_path!("origin"))
.map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS))
.unwrap_or(false);

Expand Down
27 changes: 14 additions & 13 deletions src/sinks/datadog/traces/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use prost::Message;
use snafu::Snafu;
use vector_common::request_metadata::RequestMetadata;
use vector_core::event::{EventFinalizers, Finalizable};
use vrl::event_path;

use super::{
apm_stats::{compute_apm_stats, Aggregator},
Expand Down Expand Up @@ -283,7 +284,7 @@ impl DatadogTracesEncoder {

fn vector_trace_into_dd_tracer_payload(trace: &TraceEvent) -> dd_proto::TracerPayload {
let tags = trace
.get("tags")
.get(event_path!("tags"))
.and_then(|m| m.as_object())
.map(|m| {
m.iter()
Expand All @@ -292,7 +293,7 @@ impl DatadogTracesEncoder {
})
.unwrap_or_default();

let spans = match trace.get("spans") {
let spans = match trace.get(event_path!("spans")) {
Some(Value::Array(v)) => v
.iter()
.filter_map(|s| s.as_object().map(DatadogTracesEncoder::convert_span))
Expand All @@ -302,19 +303,19 @@ impl DatadogTracesEncoder {

let chunk = dd_proto::TraceChunk {
priority: trace
.get("priority")
.get(event_path!("priority"))
.and_then(|v| v.as_integer().map(|v| v as i32))
// This should not happen for Datadog originated traces, but in case this field is not populated
// we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
// which is what the Datadog trace-agent is doing for OTLP originated traces, as per
// https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
.unwrap_or(1i32),
origin: trace
.get("origin")
.get(event_path!("origin"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
dropped_trace: trace
.get("dropped")
.get(event_path!("dropped"))
.and_then(|v| v.as_boolean())
.unwrap_or(false),
spans,
Expand All @@ -323,37 +324,37 @@ impl DatadogTracesEncoder {

dd_proto::TracerPayload {
container_id: trace
.get("container_id")
.get(event_path!("container_id"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
language_name: trace
.get("language_name")
.get(event_path!("language_name"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
language_version: trace
.get("language_version")
.get(event_path!("language_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
tracer_version: trace
.get("tracer_version")
.get(event_path!("tracer_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
runtime_id: trace
.get("runtime_id")
.get(event_path!("runtime_id"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
chunks: vec![chunk],
tags,
env: trace
.get("env")
.get(event_path!("env"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
hostname: trace
.get("hostname")
.get(event_path!("hostname"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
app_version: trace
.get("app_version")
.get(event_path!("app_version"))
.map(|v| v.to_string_lossy().into_owned())
.unwrap_or_default(),
}
Expand Down
11 changes: 7 additions & 4 deletions src/sinks/datadog/traces/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_util::{
};
use tokio::sync::oneshot::{channel, Sender};
use tower::Service;
use vrl::event_path;
use vrl::path::PathPrefix;

use vector_core::{
Expand Down Expand Up @@ -54,19 +55,21 @@ impl Partitioner for EventPartitioner {
}
Event::Trace(t) => PartitionKey {
api_key: item.metadata().datadog_api_key(),
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
env: t
.get(event_path!("env"))
.map(|s| s.to_string_lossy().into_owned()),
hostname: log_schema().host_key().and_then(|key| {
t.get((PathPrefix::Event, key))
.map(|s| s.to_string_lossy().into_owned())
}),
agent_version: t
.get("agent_version")
.get(event_path!("agent_version"))
.map(|s| s.to_string_lossy().into_owned()),
target_tps: t
.get("target_tps")
.get(event_path!("target_tps"))
.and_then(|tps| tps.as_integer().map(Into::into)),
error_tps: t
.get("error_tps")
.get(event_path!("error_tps"))
.and_then(|tps| tps.as_integer().map(Into::into)),
},
}
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,9 @@ impl DataStreamConfig {
let (dtype, dataset, namespace) = if !self.auto_routing {
(self.dtype(log)?, self.dataset(log)?, self.namespace(log)?)
} else {
let data_stream = log.get("data_stream").and_then(|ds| ds.as_object());
let data_stream = log
.get(event_path!("data_stream"))
.and_then(|ds| ds.as_object());
let dtype = data_stream
.and_then(|ds| ds.get("type"))
.map(|value| value.to_string_lossy().into_owned())
Expand Down
Loading