Skip to content

Commit d29424d

Browse files
authored
feat: Migrate LogSchema source_type_key to new lookup code (#17947)
This part of #13033. Tried this with a config where `source_type_key` is: * `""` --> no source entry * not defined --> same as above * `"foo"` --> `"foo":"demo_logs"` * `"foo.bar"` --> `"foo":{"bar":"demo_logs"}` * `"foo.."` --> `Configuration error. error=Invalid field path "foo.."` The config: ``` data_dir = "/Users/pavlos.rontidis/my_tests/vector" [log_schema] source_type_key = "foo" [sources.source0] format = "json" type = "demo_logs" [sinks.sink0] inputs = ["source0"] target = "stdout" type = "console" [sinks.sink0.encoding] codec = "json" ```
1 parent a05542a commit d29424d

File tree

35 files changed

+240
-140
lines changed

35 files changed

+240
-140
lines changed

lib/opentelemetry-proto/src/convert.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ impl ResourceLog {
208208

209209
log_namespace.insert_vector_metadata(
210210
&mut log,
211-
Some(log_schema().source_type_key()),
211+
log_schema().source_type_key(),
212212
path!("source_type"),
213213
Bytes::from_static(SOURCE_NAME.as_bytes()),
214214
);

lib/vector-core/src/config/log_schema.rs

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
2-
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
1+
use lookup::lookup_v2::OptionalValuePath;
2+
use lookup::{OwnedTargetPath, OwnedValuePath};
33
use once_cell::sync::{Lazy, OnceCell};
44
use vector_config::configurable_component;
5+
use vrl::path::parse_target_path;
56

67
static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
78
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
@@ -60,7 +61,7 @@ pub struct LogSchema {
6061
///
6162
/// This field will be set by the Vector source that the event was created in.
6263
#[serde(default = "LogSchema::default_source_type_key")]
63-
source_type_key: String,
64+
source_type_key: OptionalValuePath,
6465

6566
/// The name of the event field to set the event metadata in.
6667
///
@@ -88,17 +89,15 @@ impl LogSchema {
8889
}
8990

9091
fn default_timestamp_key() -> OptionalValuePath {
91-
OptionalValuePath {
92-
path: Some(owned_value_path!("timestamp")),
93-
}
92+
OptionalValuePath::new("timestamp")
9493
}
9594

9695
fn default_host_key() -> String {
9796
String::from("host")
9897
}
9998

100-
fn default_source_type_key() -> String {
101-
String::from("source_type")
99+
fn default_source_type_key() -> OptionalValuePath {
100+
OptionalValuePath::new("source_type")
102101
}
103102

104103
fn default_metadata_key() -> String {
@@ -126,8 +125,8 @@ impl LogSchema {
126125
&self.host_key
127126
}
128127

129-
pub fn source_type_key(&self) -> &str {
130-
&self.source_type_key
128+
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
129+
self.source_type_key.path.as_ref()
131130
}
132131

133132
pub fn metadata_key(&self) -> &str {
@@ -146,8 +145,8 @@ impl LogSchema {
146145
self.host_key = v;
147146
}
148147

149-
pub fn set_source_type_key(&mut self, v: String) {
150-
self.source_type_key = v;
148+
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
149+
self.source_type_key = OptionalValuePath { path };
151150
}
152151

153152
pub fn set_metadata_key(&mut self, v: String) {
@@ -191,7 +190,7 @@ impl LogSchema {
191190
{
192191
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
193192
} else {
194-
self.set_source_type_key(other.source_type_key().to_string());
193+
self.set_source_type_key(other.source_type_key().cloned());
195194
}
196195
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
197196
&& self.metadata_key() != other.metadata_key()

lib/vector-core/src/config/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ impl LogNamespace {
476476
) {
477477
self.insert_vector_metadata(
478478
log,
479-
Some(log_schema().source_type_key()),
479+
log_schema().source_type_key(),
480480
path!("source_type"),
481481
Bytes::from_static(source_name.as_bytes()),
482482
);
@@ -551,14 +551,15 @@ mod test {
551551
use chrono::Utc;
552552
use lookup::{event_path, owned_value_path, OwnedTargetPath};
553553
use vector_common::btreemap;
554+
use vrl::path::OwnedValuePath;
554555
use vrl::value::Kind;
555556

556557
#[test]
557558
fn test_insert_standard_vector_source_metadata() {
558-
let nested_path = "a.b.c.d";
559+
let nested_path = "a.b.c.d".to_string();
559560

560561
let mut schema = LogSchema::default();
561-
schema.set_source_type_key(nested_path.to_owned());
562+
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
562563
init_log_schema(schema, false);
563564

564565
let namespace = LogNamespace::Legacy;

lib/vector-core/src/event/log_event.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,10 @@ impl LogEvent {
466466
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
467467
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
468468
// "Global Log Schema" are updated to the new path lookup code
469-
pub fn source_type_path(&self) -> &'static str {
469+
pub fn source_type_path(&self) -> Option<String> {
470470
match self.namespace() {
471-
LogNamespace::Vector => "%vector.source_type",
472-
LogNamespace::Legacy => log_schema().source_type_key(),
471+
LogNamespace::Vector => Some("%vector.source_type".to_string()),
472+
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
473473
}
474474
}
475475

@@ -514,7 +514,9 @@ impl LogEvent {
514514
pub fn get_source_type(&self) -> Option<&Value> {
515515
match self.namespace() {
516516
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
517-
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())),
517+
LogNamespace::Legacy => log_schema()
518+
.source_type_key()
519+
.and_then(|key| self.get((PathPrefix::Event, key))),
518520
}
519521
}
520522
}

lib/vector-core/src/schema/definition.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::{BTreeMap, BTreeSet};
22

33
use crate::config::{log_schema, LegacyKey, LogNamespace};
4-
use lookup::lookup_v2::{parse_value_path, TargetPath};
4+
use lookup::lookup_v2::TargetPath;
55
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
66
use vrl::value::{kind::Collection, Kind};
77

@@ -144,9 +144,7 @@ impl Definition {
144144
#[must_use]
145145
pub fn with_standard_vector_source_metadata(self) -> Self {
146146
self.with_vector_metadata(
147-
parse_value_path(log_schema().source_type_key())
148-
.ok()
149-
.as_ref(),
147+
log_schema().source_type_key(),
150148
&owned_value_path!("source_type"),
151149
Kind::bytes(),
152150
None,

lib/vector-lookup/src/lookup_v2/optional_path.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use vector_config::configurable_component;
2+
use vrl::owned_value_path;
23

34
use crate::lookup_v2::PathParseError;
45
use crate::{OwnedTargetPath, OwnedValuePath};
@@ -56,6 +57,12 @@ impl OptionalValuePath {
5657
pub fn none() -> Self {
5758
Self { path: None }
5859
}
60+
61+
pub fn new(path: &str) -> Self {
62+
Self {
63+
path: Some(owned_value_path!(path)),
64+
}
65+
}
5966
}
6067

6168
impl TryFrom<String> for OptionalValuePath {

src/sinks/datadog/events/sink.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
7575
}
7676

7777
if !log.contains("source_type_name") {
78-
log.rename_key(log.source_type_path(), "source_type_name")
78+
if let Some(source_type_path) = log.source_type_path() {
79+
log.rename_key(source_type_path.as_str(), "source_type_name")
80+
}
7981
}
8082

8183
Some(Event::from(log))

src/sinks/influxdb/logs.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,8 @@ impl SinkConfig for InfluxDbLogsConfig {
207207
.source_type_key
208208
.clone()
209209
.and_then(|k| k.path)
210-
.unwrap_or_else(|| {
211-
parse_value_path(log_schema().source_type_key())
212-
.expect("global log_schema.source_type_key to be valid path")
213-
});
210+
.or(log_schema().source_type_key().cloned())
211+
.unwrap();
214212

215213
let sink = InfluxDbLogsSink {
216214
uri,
@@ -280,11 +278,15 @@ impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
280278
self.tags.replace(host_path.clone());
281279
log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key));
282280
}
283-
self.tags.replace(log.source_type_path().to_string());
284-
log.rename_key(
285-
log.source_type_path(),
286-
(PathPrefix::Event, &self.source_type_key),
287-
);
281+
282+
if let Some(source_type_path) = log.source_type_path() {
283+
self.tags.replace(source_type_path.clone());
284+
log.rename_key(
285+
source_type_path.as_str(),
286+
(PathPrefix::Event, &self.source_type_key),
287+
);
288+
}
289+
288290
self.tags.replace("metric_type".to_string());
289291
log.insert("metric_type", "logs");
290292

src/sources/amqp.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ fn populate_event(
278278

279279
log_namespace.insert_vector_metadata(
280280
log,
281-
Some(log_schema().source_type_key()),
281+
log_schema().source_type_key(),
282282
path!("source_type"),
283283
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
284284
);
@@ -713,7 +713,10 @@ mod integration_test {
713713
trace!("{:?}", log);
714714
assert_eq!(log[log_schema().message_key()], "my message".into());
715715
assert_eq!(log["routing"], routing_key.into());
716-
assert_eq!(log[log_schema().source_type_key()], "amqp".into());
716+
assert_eq!(
717+
log[log_schema().source_type_key().unwrap().to_string()],
718+
"amqp".into()
719+
);
717720
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
718721
.as_timestamp()
719722
.unwrap();

src/sources/aws_kinesis_firehose/handlers.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub(super) async fn firehose(
9393
if let Event::Log(ref mut log) = event {
9494
log_namespace.insert_vector_metadata(
9595
log,
96-
Some(log_schema().source_type_key()),
96+
log_schema().source_type_key(),
9797
path!("source_type"),
9898
Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
9999
);

src/sources/aws_s3/sqs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ fn handle_single_log(
691691

692692
log_namespace.insert_vector_metadata(
693693
log,
694-
Some(log_schema().source_type_key()),
694+
log_schema().source_type_key(),
695695
path!("source_type"),
696696
Bytes::from_static(AwsS3Config::NAME.as_bytes()),
697697
);

src/sources/datadog_agent/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ pub struct ApiKeyQueryParams {
284284
pub(crate) struct DatadogAgentSource {
285285
pub(crate) api_key_extractor: ApiKeyExtractor,
286286
pub(crate) log_schema_host_key: &'static str,
287-
pub(crate) log_schema_source_type_key: &'static str,
287+
pub(crate) log_schema_source_type_key: String,
288288
pub(crate) log_namespace: LogNamespace,
289289
pub(crate) decoder: Decoder,
290290
protocol: &'static str,
@@ -334,7 +334,9 @@ impl DatadogAgentSource {
334334
.expect("static regex always compiles"),
335335
},
336336
log_schema_host_key: log_schema().host_key(),
337-
log_schema_source_type_key: log_schema().source_type_key(),
337+
log_schema_source_type_key: log_schema()
338+
.source_type_key()
339+
.map_or("".to_string(), |key| key.to_string()),
338340
decoder,
339341
protocol,
340342
logs_schema_definition: Arc::new(logs_schema_definition),

src/sources/datadog_agent/tests.rs

+32-8
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,10 @@ async fn full_payload_v1() {
238238
assert_eq!(log["ddsource"], "curl".into());
239239
assert_eq!(log["ddtags"], "one,two,three".into());
240240
assert!(event.metadata().datadog_api_key().is_none());
241-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
241+
assert_eq!(
242+
log[log_schema().source_type_key().unwrap().to_string()],
243+
"datadog_agent".into()
244+
);
242245
assert_eq!(
243246
event.metadata().schema_definition(),
244247
&test_logs_schema_definition()
@@ -300,7 +303,10 @@ async fn full_payload_v2() {
300303
assert_eq!(log["ddsource"], "curl".into());
301304
assert_eq!(log["ddtags"], "one,two,three".into());
302305
assert!(event.metadata().datadog_api_key().is_none());
303-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
306+
assert_eq!(
307+
log[log_schema().source_type_key().unwrap().to_string()],
308+
"datadog_agent".into()
309+
);
304310
assert_eq!(
305311
event.metadata().schema_definition(),
306312
&test_logs_schema_definition()
@@ -362,7 +368,10 @@ async fn no_api_key() {
362368
assert_eq!(log["ddsource"], "curl".into());
363369
assert_eq!(log["ddtags"], "one,two,three".into());
364370
assert!(event.metadata().datadog_api_key().is_none());
365-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
371+
assert_eq!(
372+
log[log_schema().source_type_key().unwrap().to_string()],
373+
"datadog_agent".into()
374+
);
366375
assert_eq!(
367376
event.metadata().schema_definition(),
368377
&test_logs_schema_definition()
@@ -423,7 +432,10 @@ async fn api_key_in_url() {
423432
assert_eq!(log["service"], "vector".into());
424433
assert_eq!(log["ddsource"], "curl".into());
425434
assert_eq!(log["ddtags"], "one,two,three".into());
426-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
435+
assert_eq!(
436+
log[log_schema().source_type_key().unwrap().to_string()],
437+
"datadog_agent".into()
438+
);
427439
assert_eq!(
428440
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
429441
"12345678abcdefgh12345678abcdefgh"
@@ -488,7 +500,10 @@ async fn api_key_in_query_params() {
488500
assert_eq!(log["service"], "vector".into());
489501
assert_eq!(log["ddsource"], "curl".into());
490502
assert_eq!(log["ddtags"], "one,two,three".into());
491-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
503+
assert_eq!(
504+
log[log_schema().source_type_key().unwrap().to_string()],
505+
"datadog_agent".into()
506+
);
492507
assert_eq!(
493508
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
494509
"12345678abcdefgh12345678abcdefgh"
@@ -559,7 +574,10 @@ async fn api_key_in_header() {
559574
assert_eq!(log["service"], "vector".into());
560575
assert_eq!(log["ddsource"], "curl".into());
561576
assert_eq!(log["ddtags"], "one,two,three".into());
562-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
577+
assert_eq!(
578+
log[log_schema().source_type_key().unwrap().to_string()],
579+
"datadog_agent".into()
580+
);
563581
assert_eq!(
564582
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
565583
"12345678abcdefgh12345678abcdefgh"
@@ -706,7 +724,10 @@ async fn ignores_api_key() {
706724
assert_eq!(log["service"], "vector".into());
707725
assert_eq!(log["ddsource"], "curl".into());
708726
assert_eq!(log["ddtags"], "one,two,three".into());
709-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
727+
assert_eq!(
728+
log[log_schema().source_type_key().unwrap().to_string()],
729+
"datadog_agent".into()
730+
);
710731
assert!(event.metadata().datadog_api_key().is_none());
711732
assert_eq!(
712733
event.metadata().schema_definition(),
@@ -1398,7 +1419,10 @@ async fn split_outputs() {
13981419
assert_eq!(log["service"], "vector".into());
13991420
assert_eq!(log["ddsource"], "curl".into());
14001421
assert_eq!(log["ddtags"], "one,two,three".into());
1401-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
1422+
assert_eq!(
1423+
log[log_schema().source_type_key().unwrap().to_string()],
1424+
"datadog_agent".into()
1425+
);
14021426
assert_eq!(
14031427
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
14041428
"12345678abcdefgh12345678abcdefgh"

src/sources/datadog_agent/traces.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ fn handle_dd_trace_payload_v1(
142142
.set_datadog_api_key(Arc::clone(k));
143143
}
144144
trace_event.insert(
145-
source.log_schema_source_type_key,
145+
source.log_schema_source_type_key.as_str(),
146146
Bytes::from("datadog_agent"),
147147
);
148148
trace_event.insert("payload_version", "v2".to_string());
@@ -255,7 +255,7 @@ fn handle_dd_trace_payload_v0(
255255
trace_event.insert("language_name", lang.clone());
256256
}
257257
trace_event.insert(
258-
source.log_schema_source_type_key,
258+
source.log_schema_source_type_key.as_str(),
259259
Bytes::from("datadog_agent"),
260260
);
261261
trace_event.insert("payload_version", "v1".to_string());

0 commit comments

Comments
 (0)