Skip to content

Commit dd6dddc

Browse files
committed
feat(core): Migrate LogSchema source_type_key to new lookup code
1 parent bc5822c commit dd6dddc

File tree

35 files changed

+167
-131
lines changed

35 files changed

+167
-131
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ members = [
116116

117117
[workspace.dependencies]
118118
vrl = { version = "0.5.0", features = ["cli", "test", "test_framework", "arbitrary"] }
119+
#vrl = { path = "../vrl", features = ["cli", "test", "test_framework", "arbitrary"] }
119120

120121
[dependencies]
121122
vrl.workspace = true

lib/opentelemetry-proto/src/convert.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ impl ResourceLog {
208208

209209
log_namespace.insert_vector_metadata(
210210
&mut log,
211-
Some(log_schema().source_type_key()),
211+
log_schema().source_type_key(),
212212
path!("source_type"),
213213
Bytes::from_static(SOURCE_NAME.as_bytes()),
214214
);

lib/vector-core/src/config/log_schema.rs

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
2-
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
1+
use lookup::lookup_v2::{OptionalValuePath};
2+
use lookup::{OwnedTargetPath, OwnedValuePath};
33
use once_cell::sync::{Lazy, OnceCell};
4+
use vrl::path::parse_target_path;
45
use vector_config::configurable_component;
56

67
static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
@@ -60,7 +61,7 @@ pub struct LogSchema {
6061
///
6162
/// This field will be set by the Vector source that the event was created in.
6263
#[serde(default = "LogSchema::default_source_type_key")]
63-
source_type_key: String,
64+
source_type_key: OptionalValuePath,
6465

6566
/// The name of the event field to set the event metadata in.
6667
///
@@ -88,17 +89,15 @@ impl LogSchema {
8889
}
8990

9091
fn default_timestamp_key() -> OptionalValuePath {
91-
OptionalValuePath {
92-
path: Some(owned_value_path!("timestamp")),
93-
}
92+
OptionalValuePath::new("timestamp")
9493
}
9594

9695
fn default_host_key() -> String {
9796
String::from("host")
9897
}
9998

100-
fn default_source_type_key() -> String {
101-
String::from("source_type")
99+
fn default_source_type_key() -> OptionalValuePath {
100+
OptionalValuePath::new("source_type")
102101
}
103102

104103
fn default_metadata_key() -> String {
@@ -126,8 +125,8 @@ impl LogSchema {
126125
&self.host_key
127126
}
128127

129-
pub fn source_type_key(&self) -> &str {
130-
&self.source_type_key
128+
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
129+
self.source_type_key.path.as_ref()
131130
}
132131

133132
pub fn metadata_key(&self) -> &str {
@@ -146,8 +145,8 @@ impl LogSchema {
146145
self.host_key = v;
147146
}
148147

149-
pub fn set_source_type_key(&mut self, v: String) {
150-
self.source_type_key = v;
148+
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
149+
self.source_type_key = OptionalValuePath { path };
151150
}
152151

153152
pub fn set_metadata_key(&mut self, v: String) {
@@ -191,7 +190,7 @@ impl LogSchema {
191190
{
192191
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
193192
} else {
194-
self.set_source_type_key(other.source_type_key().to_string());
193+
self.set_source_type_key(other.source_type_key().cloned());
195194
}
196195
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
197196
&& self.metadata_key() != other.metadata_key()

lib/vector-core/src/config/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ impl LogNamespace {
476476
) {
477477
self.insert_vector_metadata(
478478
log,
479-
Some(log_schema().source_type_key()),
479+
log_schema().source_type_key(),
480480
path!("source_type"),
481481
Bytes::from_static(source_name.as_bytes()),
482482
);

lib/vector-core/src/event/log_event.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,10 @@ impl LogEvent {
466466
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
467467
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
468468
// "Global Log Schema" are updated to the new path lookup code
469-
pub fn source_type_path(&self) -> &'static str {
469+
pub fn source_type_path(&self) -> Option<String> {
470470
match self.namespace() {
471-
LogNamespace::Vector => "%vector.source_type",
472-
LogNamespace::Legacy => log_schema().source_type_key(),
471+
LogNamespace::Vector => Some("%vector.source_type".to_string()),
472+
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
473473
}
474474
}
475475

@@ -514,7 +514,9 @@ impl LogEvent {
514514
pub fn get_source_type(&self) -> Option<&Value> {
515515
match self.namespace() {
516516
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
517-
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())),
517+
LogNamespace::Legacy => log_schema()
518+
.source_type_key()
519+
.and_then(|key| self.get((PathPrefix::Event, key))),
518520
}
519521
}
520522
}

lib/vector-core/src/schema/definition.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::{BTreeMap, BTreeSet};
22

33
use crate::config::{log_schema, LegacyKey, LogNamespace};
4-
use lookup::lookup_v2::{parse_value_path, TargetPath};
4+
use lookup::lookup_v2::TargetPath;
55
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
66
use vrl::value::{kind::Collection, Kind};
77

@@ -144,9 +144,7 @@ impl Definition {
144144
#[must_use]
145145
pub fn with_standard_vector_source_metadata(self) -> Self {
146146
self.with_vector_metadata(
147-
parse_value_path(log_schema().source_type_key())
148-
.ok()
149-
.as_ref(),
147+
log_schema().source_type_key(),
150148
&owned_value_path!("source_type"),
151149
Kind::bytes(),
152150
None,

lib/vector-lookup/src/lookup_v2/optional_path.rs

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use vrl::owned_value_path;
12
use vector_config::configurable_component;
23

34
use crate::lookup_v2::PathParseError;
@@ -56,6 +57,10 @@ impl OptionalValuePath {
5657
pub fn none() -> Self {
5758
Self { path: None }
5859
}
60+
61+
pub fn new(path: &str) -> Self {
62+
Self { path: Some(owned_value_path!(path)) }
63+
}
5964
}
6065

6166
impl TryFrom<String> for OptionalValuePath {

src/sinks/datadog/events/sink.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
7575
}
7676

7777
if !log.contains("source_type_name") {
78-
log.rename_key(log.source_type_path(), "source_type_name")
78+
if let Some(source_type_path) = log.source_type_path() {
79+
log.rename_key(source_type_path.as_str(), "source_type_name")
80+
}
7981
}
8082

8183
Some(Event::from(log))

src/sinks/influxdb/logs.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,7 @@ impl SinkConfig for InfluxDbLogsConfig {
206206
let source_type_key = self
207207
.source_type_key
208208
.clone()
209-
.and_then(|k| k.path)
210-
.unwrap_or_else(|| {
211-
parse_value_path(log_schema().source_type_key())
212-
.expect("global log_schema.source_type_key to be valid path")
213-
});
209+
.and_then(|k| k.path).or(log_schema().source_type_key().cloned()).unwrap();
214210

215211
let sink = InfluxDbLogsSink {
216212
uri,
@@ -280,11 +276,18 @@ impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
280276
self.tags.replace(host_path.clone());
281277
log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key));
282278
}
283-
self.tags.replace(log.source_type_path().to_string());
284-
log.rename_key(
285-
log.source_type_path(),
286-
(PathPrefix::Event, &self.source_type_key),
287-
);
279+
280+
if let Some(source_type_path) = log.source_type_path() {
281+
self.tags.replace(source_type_path);
282+
}
283+
284+
if let Some(source_type_key) = log_schema().source_type_key() {
285+
log.rename_key(
286+
(PathPrefix::Event, source_type_key),
287+
(PathPrefix::Event, &self.source_type_key),
288+
);
289+
}
290+
288291
self.tags.replace("metric_type".to_string());
289292
log.insert("metric_type", "logs");
290293

src/sources/amqp.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ fn populate_event(
278278

279279
log_namespace.insert_vector_metadata(
280280
log,
281-
Some(log_schema().source_type_key()),
281+
log_schema().source_type_key(),
282282
path!("source_type"),
283283
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
284284
);
@@ -713,7 +713,7 @@ mod integration_test {
713713
trace!("{:?}", log);
714714
assert_eq!(log[log_schema().message_key()], "my message".into());
715715
assert_eq!(log["routing"], routing_key.into());
716-
assert_eq!(log[log_schema().source_type_key()], "amqp".into());
716+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "amqp".into());
717717
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
718718
.as_timestamp()
719719
.unwrap();

src/sources/aws_kinesis_firehose/handlers.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub(super) async fn firehose(
9393
if let Event::Log(ref mut log) = event {
9494
log_namespace.insert_vector_metadata(
9595
log,
96-
Some(log_schema().source_type_key()),
96+
log_schema().source_type_key(),
9797
path!("source_type"),
9898
Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
9999
);

src/sources/aws_s3/sqs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ fn handle_single_log(
691691

692692
log_namespace.insert_vector_metadata(
693693
log,
694-
Some(log_schema().source_type_key()),
694+
log_schema().source_type_key(),
695695
path!("source_type"),
696696
Bytes::from_static(AwsS3Config::NAME.as_bytes()),
697697
);

src/sources/datadog_agent/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,8 @@ impl DatadogAgentSource {
334334
.expect("static regex always compiles"),
335335
},
336336
log_schema_host_key: log_schema().host_key(),
337-
log_schema_source_type_key: log_schema().source_type_key(),
337+
log_schema_source_type_key: log_schema().source_type_key()
338+
.map_or("", |key| Box::leak(key.to_string().into_boxed_str())),
338339
decoder,
339340
protocol,
340341
logs_schema_definition: Arc::new(logs_schema_definition),

src/sources/datadog_agent/tests.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ async fn full_payload_v1() {
238238
assert_eq!(log["ddsource"], "curl".into());
239239
assert_eq!(log["ddtags"], "one,two,three".into());
240240
assert!(event.metadata().datadog_api_key().is_none());
241-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
241+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
242242
assert_eq!(
243243
event.metadata().schema_definition(),
244244
&test_logs_schema_definition()
@@ -300,7 +300,7 @@ async fn full_payload_v2() {
300300
assert_eq!(log["ddsource"], "curl".into());
301301
assert_eq!(log["ddtags"], "one,two,three".into());
302302
assert!(event.metadata().datadog_api_key().is_none());
303-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
303+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
304304
assert_eq!(
305305
event.metadata().schema_definition(),
306306
&test_logs_schema_definition()
@@ -362,7 +362,7 @@ async fn no_api_key() {
362362
assert_eq!(log["ddsource"], "curl".into());
363363
assert_eq!(log["ddtags"], "one,two,three".into());
364364
assert!(event.metadata().datadog_api_key().is_none());
365-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
365+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
366366
assert_eq!(
367367
event.metadata().schema_definition(),
368368
&test_logs_schema_definition()
@@ -423,7 +423,7 @@ async fn api_key_in_url() {
423423
assert_eq!(log["service"], "vector".into());
424424
assert_eq!(log["ddsource"], "curl".into());
425425
assert_eq!(log["ddtags"], "one,two,three".into());
426-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
426+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
427427
assert_eq!(
428428
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
429429
"12345678abcdefgh12345678abcdefgh"
@@ -488,7 +488,7 @@ async fn api_key_in_query_params() {
488488
assert_eq!(log["service"], "vector".into());
489489
assert_eq!(log["ddsource"], "curl".into());
490490
assert_eq!(log["ddtags"], "one,two,three".into());
491-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
491+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
492492
assert_eq!(
493493
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
494494
"12345678abcdefgh12345678abcdefgh"
@@ -559,7 +559,7 @@ async fn api_key_in_header() {
559559
assert_eq!(log["service"], "vector".into());
560560
assert_eq!(log["ddsource"], "curl".into());
561561
assert_eq!(log["ddtags"], "one,two,three".into());
562-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
562+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
563563
assert_eq!(
564564
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
565565
"12345678abcdefgh12345678abcdefgh"
@@ -706,7 +706,7 @@ async fn ignores_api_key() {
706706
assert_eq!(log["service"], "vector".into());
707707
assert_eq!(log["ddsource"], "curl".into());
708708
assert_eq!(log["ddtags"], "one,two,three".into());
709-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
709+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
710710
assert!(event.metadata().datadog_api_key().is_none());
711711
assert_eq!(
712712
event.metadata().schema_definition(),
@@ -1398,7 +1398,7 @@ async fn split_outputs() {
13981398
assert_eq!(log["service"], "vector".into());
13991399
assert_eq!(log["ddsource"], "curl".into());
14001400
assert_eq!(log["ddtags"], "one,two,three".into());
1401-
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
1401+
assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "datadog_agent".into());
14021402
assert_eq!(
14031403
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
14041404
"12345678abcdefgh12345678abcdefgh"

src/sources/dnstap/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ pub struct DnstapFrameHandler {
212212
socket_send_buffer_size: Option<usize>,
213213
host_key: Option<OwnedValuePath>,
214214
timestamp_key: Option<OwnedValuePath>,
215-
source_type_key: String,
215+
source_type_key: Option<OwnedValuePath>,
216216
bytes_received: Registered<BytesReceived>,
217217
log_namespace: LogNamespace,
218218
}
@@ -242,7 +242,7 @@ impl DnstapFrameHandler {
242242
socket_send_buffer_size: config.socket_send_buffer_size,
243243
host_key,
244244
timestamp_key: timestamp_key.cloned(),
245-
source_type_key: source_type_key.to_string(),
245+
source_type_key: source_type_key.cloned(),
246246
bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))),
247247
log_namespace,
248248
}
@@ -307,7 +307,7 @@ impl FrameHandler for DnstapFrameHandler {
307307

308308
self.log_namespace.insert_vector_metadata(
309309
&mut log_event,
310-
Some(self.source_type_key()),
310+
self.source_type_key(),
311311
path!("source_type"),
312312
DnstapConfig::NAME,
313313
);
@@ -343,8 +343,8 @@ impl FrameHandler for DnstapFrameHandler {
343343
&self.host_key
344344
}
345345

346-
fn source_type_key(&self) -> &str {
347-
self.source_type_key.as_str()
346+
fn source_type_key(&self) -> Option<&OwnedValuePath> {
347+
self.source_type_key.as_ref()
348348
}
349349

350350
fn timestamp_key(&self) -> Option<&OwnedValuePath> {

src/sources/docker_logs/mod.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc};
1414
use codecs::{BytesDeserializer, BytesDeserializerConfig};
1515
use futures::{Stream, StreamExt};
1616
use lookup::{
17-
lookup_v2::{parse_value_path, OptionalValuePath},
17+
lookup_v2::{OptionalValuePath},
1818
metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix,
1919
};
2020
use once_cell::sync::Lazy;
@@ -338,9 +338,7 @@ impl SourceConfig for DockerLogsConfig {
338338
Some("timestamp"),
339339
)
340340
.with_vector_metadata(
341-
parse_value_path(log_schema().source_type_key())
342-
.ok()
343-
.as_ref(),
341+
log_schema().source_type_key(),
344342
&owned_value_path!("source_type"),
345343
Kind::bytes(),
346344
None,
@@ -1119,7 +1117,7 @@ impl ContainerLogInfo {
11191117

11201118
log_namespace.insert_vector_metadata(
11211119
&mut log,
1122-
Some(log_schema().source_type_key()),
1120+
log_schema().source_type_key(),
11231121
path!("source_type"),
11241122
Bytes::from_static(DockerLogsConfig::NAME.as_bytes()),
11251123
);

0 commit comments

Comments
 (0)