Skip to content

Commit 1f722b5

Browse files
committed
feat: replace LogEvent 'String's with '&OwnedTargetPath's
1 parent 8663602 commit 1f722b5

File tree

9 files changed

+141
-105
lines changed

9 files changed

+141
-105
lines changed

lib/vector-core/src/config/log_schema.rs

+44-23
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use lookup::lookup_v2::OptionalValuePath;
1+
use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
22
use lookup::{OwnedTargetPath, OwnedValuePath};
33
use once_cell::sync::{Lazy, OnceCell};
44
use vector_config::configurable_component;
5+
use vrl::path::PathPrefix;
56

67
static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
78
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
@@ -49,24 +50,24 @@ pub struct LogSchema {
4950
///
5051
/// This would be the field that holds the raw message, such as a raw log line.
5152
#[serde(default = "LogSchema::default_message_key")]
52-
message_key: OptionalValuePath,
53+
message_key: OptionalTargetPath,
5354

5455
/// The name of the event field to treat as the event timestamp.
5556
#[serde(default = "LogSchema::default_timestamp_key")]
56-
timestamp_key: OptionalValuePath,
57+
timestamp_key: OptionalTargetPath,
5758

5859
/// The name of the event field to treat as the host which sent the message.
5960
///
6061
/// This field will generally represent a real host, or container, that generated the message,
6162
/// but is somewhat source-dependent.
6263
#[serde(default = "LogSchema::default_host_key")]
63-
host_key: OptionalValuePath,
64+
host_key: OptionalTargetPath,
6465

6566
/// The name of the event field to set the source identifier in.
6667
///
6768
/// This field will be set by the Vector source that the event was created in.
6869
#[serde(default = "LogSchema::default_source_type_key")]
69-
source_type_key: OptionalValuePath,
70+
source_type_key: OptionalTargetPath,
7071

7172
/// The name of the event field to set the event metadata in.
7273
///
@@ -89,28 +90,28 @@ impl Default for LogSchema {
8990
}
9091

9192
impl LogSchema {
92-
fn default_message_key() -> OptionalValuePath {
93-
OptionalValuePath::new(MESSAGE)
93+
fn default_message_key() -> OptionalTargetPath {
94+
OptionalTargetPath::event(MESSAGE)
9495
}
9596

96-
fn default_timestamp_key() -> OptionalValuePath {
97-
OptionalValuePath::new(TIMESTAMP)
97+
fn default_timestamp_key() -> OptionalTargetPath {
98+
OptionalTargetPath::event(TIMESTAMP)
9899
}
99100

100-
fn default_host_key() -> OptionalValuePath {
101-
OptionalValuePath::new(HOST)
101+
fn default_host_key() -> OptionalTargetPath {
102+
OptionalTargetPath::event(HOST)
102103
}
103104

104-
fn default_source_type_key() -> OptionalValuePath {
105-
OptionalValuePath::new(SOURCE_TYPE)
105+
fn default_source_type_key() -> OptionalTargetPath {
106+
OptionalTargetPath::event(SOURCE_TYPE)
106107
}
107108

108109
fn default_metadata_key() -> OptionalValuePath {
109110
OptionalValuePath::new(METADATA)
110111
}
111112

112113
pub fn message_key(&self) -> Option<&OwnedValuePath> {
113-
self.message_key.path.as_ref()
114+
self.message_key.path.as_ref().map(|key| &key.path)
114115
}
115116

116117
/// Returns an `OwnedTargetPath` of the message key.
@@ -119,39 +120,59 @@ impl LogSchema {
119120
/// This should only be used where the result will either be cached,
120121
/// or performance isn't critical, since this requires parsing / memory allocation.
121122
pub fn owned_message_path(&self) -> OwnedTargetPath {
122-
OwnedTargetPath::event(self.message_key.clone().path.expect("valid message key"))
123+
self.message_key
124+
.path
125+
.as_ref()
126+
.expect("valid message key")
127+
.clone()
123128
}
124129

125130
pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
126-
self.timestamp_key.path.as_ref()
131+
self.timestamp_key.as_ref().map(|key| &key.path)
127132
}
128133

129134
pub fn host_key(&self) -> Option<&OwnedValuePath> {
130-
self.host_key.path.as_ref()
135+
self.host_key.as_ref().map(|key| &key.path)
131136
}
132137

133138
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
134-
self.source_type_key.path.as_ref()
139+
self.source_type_key.as_ref().map(|key| &key.path)
135140
}
136141

137142
pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
138143
self.metadata_key.path.as_ref()
139144
}
140145

146+
pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
147+
self.message_key.as_ref()
148+
}
149+
150+
pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
151+
self.timestamp_key.as_ref()
152+
}
153+
154+
pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
155+
self.host_key.as_ref()
156+
}
157+
158+
pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
159+
self.source_type_key.as_ref()
160+
}
161+
141162
pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
142-
self.message_key = OptionalValuePath { path };
163+
self.message_key = OptionalTargetPath::from(PathPrefix::Event, path);
143164
}
144165

145-
pub fn set_timestamp_key(&mut self, v: Option<OwnedValuePath>) {
146-
self.timestamp_key = OptionalValuePath { path: v };
166+
pub fn set_timestamp_key(&mut self, path: Option<OwnedValuePath>) {
167+
self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path);
147168
}
148169

149170
pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
150-
self.host_key = OptionalValuePath { path };
171+
self.host_key = OptionalTargetPath::from(PathPrefix::Event, path);
151172
}
152173

153174
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
154-
self.source_type_key = OptionalValuePath { path };
175+
self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path);
155176
}
156177

157178
pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {

lib/vector-core/src/event/log_event.rs

+24-23
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use vector_common::{
2020
request_metadata::GetEventCountTags,
2121
EventDataEq,
2222
};
23-
use vrl::path::OwnedValuePath;
23+
use vrl::path::{OwnedTargetPath, OwnedValuePath};
2424

2525
use super::{
2626
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
@@ -32,6 +32,15 @@ use crate::config::LogNamespace;
3232
use crate::config::{log_schema, telemetry};
3333
use crate::{event::MaybeAsLogMut, ByteSizeOf};
3434
use lookup::{metadata_path, path};
35+
use once_cell::sync::Lazy;
36+
use vrl::owned_value_path;
37+
38+
static VECTOR_SOURCE_TYPE_PATH: Lazy<Option<OwnedTargetPath>> = Lazy::new(|| {
39+
Some(OwnedTargetPath::metadata(owned_value_path!(
40+
"vector",
41+
"source_type"
42+
)))
43+
});
3544

3645
#[derive(Debug, Deserialize)]
3746
struct Inner {
@@ -296,7 +305,7 @@ impl LogEvent {
296305

297306
/// Retrieves the value of a field based on it's meaning.
298307
/// This will first check if the value has previously been dropped. It is worth being
299-
/// aware that if the field has been dropped and then some how readded, we still fetch
308+
/// aware that if the field has been dropped and then somehow re-added, we still fetch
300309
/// the dropped value here.
301310
pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
302311
if let Some(dropped) = self.metadata().dropped_field(&meaning) {
@@ -309,12 +318,11 @@ impl LogEvent {
309318
}
310319
}
311320

312-
// TODO(Jean): Once the event API uses `Lookup`, the allocation here can be removed.
313-
pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<String> {
321+
/// Retrieves the target path of a field based on the specified `meaning`.
322+
fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
314323
self.metadata()
315324
.schema_definition()
316325
.meaning_path(meaning.as_ref())
317-
.map(std::string::ToString::to_string)
318326
}
319327

320328
#[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
@@ -452,45 +460,37 @@ impl LogEvent {
452460
impl LogEvent {
453461
/// Fetches the "message" path of the event. This is either from the "message" semantic meaning (Vector namespace)
454462
/// or from the message key set on the "Global Log Schema" (Legacy namespace).
455-
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
456-
// "Global Log Schema" are updated to the new path lookup code
457-
pub fn message_path(&self) -> Option<String> {
463+
pub fn message_path(&self) -> Option<&OwnedTargetPath> {
458464
match self.namespace() {
459465
LogNamespace::Vector => self.find_key_by_meaning("message"),
460-
LogNamespace::Legacy => log_schema().message_key().map(ToString::to_string),
466+
LogNamespace::Legacy => log_schema().message_key_target_path(),
461467
}
462468
}
463469

464470
/// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
465471
/// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
466-
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
467-
// "Global Log Schema" are updated to the new path lookup code
468-
pub fn timestamp_path(&self) -> Option<String> {
472+
pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
469473
match self.namespace() {
470474
LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
471-
LogNamespace::Legacy => log_schema().timestamp_key().map(ToString::to_string),
475+
LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
472476
}
473477
}
474478

475479
/// Fetches the `host` path of the event. This is either from the "host" semantic meaning (Vector namespace)
476480
/// or from the host key set on the "Global Log Schema" (Legacy namespace).
477-
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
478-
// "Global Log Schema" are updated to the new path lookup code
479-
pub fn host_path(&self) -> Option<String> {
481+
pub fn host_path(&self) -> Option<&OwnedTargetPath> {
480482
match self.namespace() {
481483
LogNamespace::Vector => self.find_key_by_meaning("host"),
482-
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
484+
LogNamespace::Legacy => log_schema().host_key_target_path(),
483485
}
484486
}
485487

486488
/// Fetches the `source_type` path of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
487489
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
488-
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
489-
// "Global Log Schema" are updated to the new path lookup code
490-
pub fn source_type_path(&self) -> Option<String> {
490+
pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
491491
match self.namespace() {
492-
LogNamespace::Vector => Some("%vector.source_type".to_string()),
493-
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
492+
LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
493+
LogNamespace::Legacy => log_schema().source_type_key_target_path(),
494494
}
495495
}
496496

@@ -520,7 +520,8 @@ impl LogEvent {
520520
/// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
521521
pub fn remove_timestamp(&mut self) -> Option<Value> {
522522
self.timestamp_path()
523-
.and_then(|key| self.remove(key.as_str()))
523+
.cloned()
524+
.and_then(|key| self.remove(&key))
524525
}
525526

526527
/// Fetches the `host` of the event. This is either from the "host" semantic meaning (Vector namespace)

lib/vector-lookup/src/lookup_v2/optional_path.rs

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use vector_config::configurable_component;
22
use vrl::owned_value_path;
3+
use vrl::path::PathPrefix;
34

45
use crate::lookup_v2::PathParseError;
56
use crate::{OwnedTargetPath, OwnedValuePath};
@@ -16,6 +17,25 @@ impl OptionalTargetPath {
1617
pub fn none() -> Self {
1718
Self { path: None }
1819
}
20+
21+
pub fn event(path: &str) -> Self {
22+
Self {
23+
path: Some(OwnedTargetPath {
24+
prefix: PathPrefix::Event,
25+
path: owned_value_path!(path),
26+
}),
27+
}
28+
}
29+
30+
pub fn from(prefix: PathPrefix, path: Option<OwnedValuePath>) -> Self {
31+
Self {
32+
path: path.map(|path| OwnedTargetPath { prefix, path }),
33+
}
34+
}
35+
36+
pub fn as_ref(&self) -> Option<&OwnedTargetPath> {
37+
self.path.as_ref()
38+
}
1939
}
2040

2141
impl TryFrom<String> for OptionalTargetPath {

src/sinks/datadog/events/sink.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,26 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
5858
if !log.contains("text") {
5959
let message_path = log
6060
.message_path()
61-
.expect("message is required (make sure the \"message\" semantic meaning is set)");
62-
log.rename_key(message_path.as_str(), event_path!("text"))
61+
.expect("message is required (make sure the \"message\" semantic meaning is set)")
62+
.clone();
63+
log.rename_key(&message_path, event_path!("text"));
6364
}
6465

6566
if !log.contains("host") {
66-
if let Some(host_path) = log.host_path() {
67-
log.rename_key(host_path.as_str(), event_path!("host"));
67+
if let Some(host_path) = log.host_path().cloned().as_ref() {
68+
log.rename_key(host_path, event_path!("host"));
6869
}
6970
}
7071

7172
if !log.contains("date_happened") {
72-
if let Some(timestamp_path) = log.timestamp_path() {
73-
log.rename_key(timestamp_path.as_str(), "date_happened");
73+
if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() {
74+
log.rename_key(timestamp_path, "date_happened");
7475
}
7576
}
7677

7778
if !log.contains("source_type_name") {
78-
if let Some(source_type_path) = log.source_type_path() {
79-
log.rename_key(source_type_path.as_str(), "source_type_name")
79+
if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
80+
log.rename_key(source_type_path, "source_type_name");
8081
}
8182
}
8283

src/sinks/datadog/logs/sink.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,21 @@ impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
134134
let log = event.as_mut_log();
135135
let message_path = log
136136
.message_path()
137-
.expect("message is required (make sure the \"message\" semantic meaning is set)");
138-
log.rename_key(message_path.as_str(), event_path!("message"));
137+
.expect("message is required (make sure the \"message\" semantic meaning is set)")
138+
.clone();
139+
log.rename_key(&message_path, event_path!("message"));
139140

140-
if let Some(host_path) = log.host_path() {
141-
log.rename_key(host_path.as_str(), event_path!("hostname"));
141+
if let Some(host_path) = log.host_path().cloned().as_ref() {
142+
log.rename_key(host_path, event_path!("hostname"));
142143
}
143144

144-
if let Some(Value::Timestamp(ts)) = log.remove(
145-
log
145+
let message_path = log
146146
.timestamp_path()
147-
.expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)")
148-
.as_str()
149-
) {
147+
.expect(
148+
"timestamp is required (make sure the \"timestamp\" semantic meaning is set)",
149+
)
150+
.clone();
151+
if let Some(Value::Timestamp(ts)) = log.remove(&message_path) {
150152
log.insert(
151153
event_path!("timestamp"),
152154
Value::Integer(ts.timestamp_millis()),

src/sinks/elasticsearch/config.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -355,15 +355,12 @@ impl DataStreamConfig {
355355

356356
/// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
357357
pub fn remap_timestamp(&self, log: &mut LogEvent) {
358-
if let Some(timestamp_key) = log.timestamp_path() {
359-
if timestamp_key == DATA_STREAM_TIMESTAMP_KEY {
358+
if let Some(timestamp_key) = log.timestamp_path().cloned() {
359+
if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
360360
return;
361361
}
362362

363-
log.rename_key(
364-
timestamp_key.as_str(),
365-
event_path!(DATA_STREAM_TIMESTAMP_KEY),
366-
)
363+
log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
367364
}
368365
}
369366

0 commit comments

Comments
 (0)