Skip to content

Commit 28f5c23

Browse files
authored
feat: replace tuples with &OwnedTargetPath wherever possible (#18097)
* feat: replace tuples with &OwnedTargetPath wherever possible * one more replacement * remove unused imports
1 parent 065eecb commit 28f5c23

File tree

19 files changed

+67
-112
lines changed

19 files changed

+67
-112
lines changed

benches/template.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
1313
let index = Template::try_from("index-%Y.%m.%d").unwrap();
1414
let mut event = Event::Log(LogEvent::from("hello world"));
1515
event.as_mut_log().insert(
16-
(
17-
lookup::PathPrefix::Event,
18-
log_schema().timestamp_key().unwrap(),
19-
),
16+
log_schema().timestamp_key_target_path().unwrap(),
2017
Utc::now(),
2118
);
2219

@@ -31,10 +28,7 @@ fn bench_elasticsearch_index(c: &mut Criterion) {
3128
let index = Template::try_from("index").unwrap();
3229
let mut event = Event::Log(LogEvent::from("hello world"));
3330
event.as_mut_log().insert(
34-
(
35-
lookup::PathPrefix::Event,
36-
log_schema().timestamp_key().unwrap(),
37-
),
31+
log_schema().timestamp_key_target_path().unwrap(),
3832
Utc::now(),
3933
);
4034

lib/codecs/src/decoding/format/bytes.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use vector_core::{
88
event::{Event, LogEvent},
99
schema,
1010
};
11-
use vrl::path::PathPrefix;
1211
use vrl::value::Kind;
1312

1413
use super::Deserializer;
@@ -63,7 +62,7 @@ impl BytesDeserializer {
6362
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
6463
LogNamespace::Legacy => {
6564
let mut log = LogEvent::default();
66-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
65+
log.maybe_insert(log_schema().message_key_target_path(), bytes);
6766
log
6867
}
6968
}

lib/codecs/src/decoding/format/gelf.rs

+6-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use bytes::Bytes;
22
use chrono::{DateTime, NaiveDateTime, Utc};
33
use derivative::Derivative;
4-
use lookup::{event_path, owned_value_path, PathPrefix};
4+
use lookup::{event_path, owned_value_path};
55
use serde::{Deserialize, Serialize};
66
use smallvec::{smallvec, SmallVec};
77
use std::collections::HashMap;
@@ -130,20 +130,17 @@ impl GelfDeserializer {
130130
log.insert(FULL_MESSAGE, full_message.to_string());
131131
}
132132

133-
if let Some(timestamp_key) = log_schema().timestamp_key() {
133+
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
134134
if let Some(timestamp) = parsed.timestamp {
135135
let naive = NaiveDateTime::from_timestamp_opt(
136136
f64::trunc(timestamp) as i64,
137137
f64::fract(timestamp) as u32,
138138
)
139139
.expect("invalid timestamp");
140-
log.insert(
141-
(PathPrefix::Event, timestamp_key),
142-
DateTime::<Utc>::from_utc(naive, Utc),
143-
);
140+
log.insert(timestamp_key, DateTime::<Utc>::from_utc(naive, Utc));
144141
// per GELF spec- add timestamp if not provided
145142
} else {
146-
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
143+
log.insert(timestamp_key, Utc::now());
147144
}
148145
}
149146

@@ -293,7 +290,7 @@ mod tests {
293290
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
294291
);
295292
assert_eq!(
296-
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
293+
log.get(log_schema().message_key_target_path().unwrap()),
297294
Some(&Value::Bytes(Bytes::from_static(
298295
b"A short message that helps you identify what is going on"
299296
)))
@@ -348,7 +345,7 @@ mod tests {
348345
let events = deserialize_gelf_input(&input).unwrap();
349346
assert_eq!(events.len(), 1);
350347
let log = events[0].as_log();
351-
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
348+
assert!(log.contains(log_schema().message_key_target_path().unwrap()));
352349
}
353350

354351
// filter out id

lib/codecs/src/decoding/format/json.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::convert::TryInto;
33
use bytes::Bytes;
44
use chrono::Utc;
55
use derivative::Derivative;
6-
use lookup::PathPrefix;
76
use smallvec::{smallvec, SmallVec};
87
use vector_config::configurable_component;
98
use vector_core::{
@@ -133,11 +132,11 @@ impl Deserializer for JsonDeserializer {
133132
LogNamespace::Legacy => {
134133
let timestamp = Utc::now();
135134

136-
if let Some(timestamp_key) = log_schema().timestamp_key() {
135+
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
137136
for event in &mut events {
138137
let log = event.as_mut_log();
139-
if !log.contains((PathPrefix::Event, timestamp_key)) {
140-
log.insert((PathPrefix::Event, timestamp_key), timestamp);
138+
if !log.contains(timestamp_key) {
139+
log.insert(timestamp_key, timestamp);
141140
}
142141
}
143142
}
@@ -218,7 +217,7 @@ mod tests {
218217
let log = event.as_log();
219218
assert_eq!(log["bar"], 456.into());
220219
assert_eq!(
221-
log.get((PathPrefix::Event, log_schema().timestamp_key().unwrap()))
220+
log.get(log_schema().timestamp_key_target_path().unwrap())
222221
.is_some(),
223222
namespace == LogNamespace::Legacy
224223
);

lib/codecs/src/decoding/format/syslog.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use bytes::Bytes;
22
use chrono::{DateTime, Datelike, Utc};
33
use derivative::Derivative;
4-
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
4+
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath};
55
use smallvec::{smallvec, SmallVec};
66
use std::borrow::Cow;
77
use std::collections::BTreeMap;
@@ -428,7 +428,7 @@ fn insert_fields_from_syslog(
428428
) {
429429
match log_namespace {
430430
LogNamespace::Legacy => {
431-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg);
431+
log.maybe_insert(log_schema().message_key_target_path(), parsed.msg);
432432
}
433433
LogNamespace::Vector => {
434434
log.insert(event_path!("message"), parsed.msg);
@@ -439,9 +439,7 @@ fn insert_fields_from_syslog(
439439
let timestamp = DateTime::<Utc>::from(timestamp);
440440
match log_namespace {
441441
LogNamespace::Legacy => {
442-
if let Some(timestamp_key) = log_schema().timestamp_key() {
443-
log.insert((PathPrefix::Event, timestamp_key), timestamp);
444-
}
442+
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
445443
}
446444
LogNamespace::Vector => {
447445
log.insert(event_path!("timestamp"), timestamp);

lib/opentelemetry-proto/src/convert.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use vector_core::{
77
config::{log_schema, LegacyKey, LogNamespace},
88
event::{Event, LogEvent},
99
};
10-
use vrl::path::PathPrefix;
1110
use vrl::value::Value;
1211

1312
use super::proto::{
@@ -95,7 +94,7 @@ impl ResourceLog {
9594
LogNamespace::Legacy => {
9695
let mut log = LogEvent::default();
9796
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
98-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v);
97+
log.maybe_insert(log_schema().message_key_target_path(), v);
9998
}
10099
log
101100
}

lib/vector-core/src/event/log_event.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use vector_common::{
2020
request_metadata::GetEventCountTags,
2121
EventDataEq,
2222
};
23-
use vrl::path::{OwnedTargetPath, OwnedValuePath};
23+
use vrl::path::OwnedTargetPath;
2424

2525
use super::{
2626
estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
@@ -160,7 +160,7 @@ impl LogEvent {
160160
/// valid for `LogNamespace::Legacy`
161161
pub fn from_str_legacy(msg: impl Into<String>) -> Self {
162162
let mut log = LogEvent::default();
163-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), msg.into());
163+
log.maybe_insert(log_schema().message_key_target_path(), msg.into());
164164

165165
if let Some(timestamp_key) = log_schema().timestamp_key() {
166166
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
@@ -356,14 +356,9 @@ impl LogEvent {
356356
}
357357
}
358358

359-
pub fn maybe_insert(
360-
&mut self,
361-
prefix: PathPrefix,
362-
path: Option<&OwnedValuePath>,
363-
value: impl Into<Value>,
364-
) {
359+
pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
365360
if let Some(path) = path {
366-
self.insert((prefix, path), value);
361+
self.insert(path, value);
367362
}
368363
}
369364

@@ -572,7 +567,7 @@ mod test_utils {
572567
impl From<Bytes> for LogEvent {
573568
fn from(message: Bytes) -> Self {
574569
let mut log = LogEvent::default();
575-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message);
570+
log.maybe_insert(log_schema().message_key_target_path(), message);
576571
if let Some(timestamp_key) = log_schema().timestamp_key() {
577572
log.insert((PathPrefix::Event, timestamp_key), Utc::now());
578573
}

lib/vector-core/src/event/vrl_target.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub struct TargetIter<T> {
5454

5555
fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
5656
let mut log = LogEvent::new_with_metadata(metadata);
57-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), value);
57+
log.maybe_insert(log_schema().message_key_target_path(), value);
5858
log
5959
}
6060

src/sinks/humio/logs.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,7 @@ mod integration_tests {
269269
);
270270

271271
let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
272-
event.insert(
273-
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
274-
ts,
275-
);
272+
event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);
276273

277274
run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
278275

src/sinks/loki/integration_tests.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use vector_core::{
1010
config::LogNamespace,
1111
event::{BatchNotifier, BatchStatus, Event, LogEvent},
1212
};
13-
use vrl::path::PathPrefix;
1413
use vrl::value::{kind::Collection, Kind};
1514

1615
use super::config::{LokiConfig, OutOfOrderAction};
@@ -328,7 +327,7 @@ async fn many_streams() {
328327
let index = (i % 5) * 2;
329328
let message = lines[index]
330329
.as_log()
331-
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
330+
.get(log_schema().message_key_target_path().unwrap())
332331
.unwrap()
333332
.to_string_lossy();
334333
assert_eq!(output, &message);
@@ -338,7 +337,7 @@ async fn many_streams() {
338337
let index = ((i % 5) * 2) + 1;
339338
let message = lines[index]
340339
.as_log()
341-
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
340+
.get(log_schema().message_key_target_path().unwrap())
342341
.unwrap()
343342
.to_string_lossy();
344343
assert_eq!(output, &message);
@@ -385,7 +384,7 @@ async fn interpolate_stream_key() {
385384
for (i, output) in outputs.iter().enumerate() {
386385
let message = lines[i]
387386
.as_log()
388-
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
387+
.get(log_schema().message_key_target_path().unwrap())
389388
.unwrap()
390389
.to_string_lossy();
391390
assert_eq!(output, &message);
@@ -638,7 +637,7 @@ async fn test_out_of_order_events(
638637
assert_eq!(
639638
&expected[i]
640639
.as_log()
641-
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
640+
.get(log_schema().message_key_target_path().unwrap())
642641
.unwrap()
643642
.to_string_lossy(),
644643
output,

src/sources/aws_sqs/integration_tests.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use aws_sdk_sqs::output::CreateQueueOutput;
77
use aws_types::region::Region;
88
use futures::StreamExt;
99
use tokio::time::timeout;
10-
use vrl::path::PathPrefix;
1110

1211
use crate::{
1312
aws::{auth::AwsAuthentication, region::RegionOrEndpoint},
@@ -110,7 +109,7 @@ pub(crate) async fn test() {
110109
for event in events {
111110
let message = event
112111
.as_log()
113-
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
112+
.get(log_schema().message_key_target_path().unwrap())
114113
.unwrap()
115114
.to_string_lossy();
116115
if !expected_messages.remove(message.as_ref()) {

src/sources/aws_sqs/source.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,12 @@ async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: St
219219

220220
#[cfg(test)]
221221
mod tests {
222-
use crate::codecs::DecodingConfig;
223-
use chrono::SecondsFormat;
224-
use lookup::path;
225-
use vrl::path::PathPrefix;
226-
227222
use super::*;
223+
use crate::codecs::DecodingConfig;
228224
use crate::config::{log_schema, SourceConfig};
229225
use crate::sources::aws_sqs::AwsSqsConfig;
226+
use chrono::SecondsFormat;
227+
use lookup::path;
230228

231229
#[tokio::test]
232230
async fn test_decode_vector_namespace() {
@@ -313,7 +311,7 @@ mod tests {
313311
events[0]
314312
.clone()
315313
.as_log()
316-
.get((PathPrefix::Event, log_schema().message_key().unwrap()))
314+
.get(log_schema().message_key_target_path().unwrap())
317315
.unwrap()
318316
.to_string_lossy(),
319317
message

src/sources/docker_logs/tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ mod integration_tests {
970970

971971
event
972972
.into_log()
973-
.remove((PathPrefix::Event, log_schema().message_key().unwrap()))
973+
.remove(log_schema().message_key_target_path().unwrap())
974974
.unwrap()
975975
.to_string_lossy()
976976
.into_owned()

src/sources/fluent/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use chrono::Utc;
99
use codecs::{BytesDeserializerConfig, StreamDecodingError};
1010
use flate2::read::MultiGzDecoder;
1111
use lookup::lookup_v2::parse_value_path;
12-
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
12+
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath};
1313
use rmp_serde::{decode, Deserializer, Serializer};
1414
use serde::{Deserialize, Serialize};
1515
use smallvec::{smallvec, SmallVec};
@@ -599,9 +599,7 @@ impl From<FluentEvent<'_>> for LogEvent {
599599
log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
600600
}
601601
LogNamespace::Legacy => {
602-
if let Some(timestamp_key) = log_schema().timestamp_key() {
603-
log.insert((PathPrefix::Event, timestamp_key), timestamp);
604-
}
602+
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
605603
}
606604
}
607605

src/sources/journald.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use bytes::Bytes;
1212
use chrono::{TimeZone, Utc};
1313
use codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder};
1414
use futures::{poll, stream::BoxStream, task::Poll, StreamExt};
15-
use lookup::{metadata_path, owned_value_path, path, PathPrefix};
15+
use lookup::{metadata_path, owned_value_path, path};
1616
use nix::{
1717
sys::signal::{kill, Signal},
1818
unistd::Pid,
@@ -741,9 +741,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) {
741741
}
742742
LogNamespace::Legacy => {
743743
if let Some(ts) = timestamp {
744-
if let Some(timestamp_key) = log_schema().timestamp_key() {
745-
log.insert((PathPrefix::Event, timestamp_key), ts);
746-
}
744+
log.maybe_insert(log_schema().timestamp_key_target_path(), ts);
747745
}
748746
}
749747
}
@@ -784,7 +782,7 @@ fn create_log_event_from_record(
784782
let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch);
785783

786784
if let Some(message) = log.remove(MESSAGE) {
787-
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message);
785+
log.maybe_insert(log_schema().message_key_target_path(), message);
788786
}
789787

790788
log

0 commit comments

Comments
 (0)