Skip to content

Commit 05a3f44

Browse files
chore: Revert transform definitions (vectordotdev#17146)
* Revert "chore(topology): Transform outputs hash table of OutputId -> Definition (vectordotdev#17059)" This reverts commit 1bdb24d. * Revert "chore(topology): split `build_pieces` into smaller functions (vectordotdev#17037)" This reverts commit 6e6f1eb. * Revert "enhancement(topology): Update transforms to handle multiple definitions (vectordotdev#16793)" This reverts commit e19f4fc. Signed-off-by: Stephen Wakely <[email protected]> --------- Signed-off-by: Stephen Wakely <[email protected]>
1 parent c169131 commit 05a3f44

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+2088
-2714
lines changed

benches/remap.rs

+7-13
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
44
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
55
use indexmap::IndexMap;
66
use vector::{
7-
config::{DataType, TransformOutput},
7+
config::{DataType, Output},
88
event::{Event, LogEvent, Value},
99
transforms::{
1010
remap::{Remap, RemapConfig},
@@ -27,10 +27,8 @@ fn benchmark_remap(c: &mut Criterion) {
2727
let mut group = c.benchmark_group("remap");
2828

2929
let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
30-
let mut outputs = TransformOutputsBuf::new_with_capacity(
31-
vec![TransformOutput::new(DataType::all(), HashMap::new())],
32-
1,
33-
);
30+
let mut outputs =
31+
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
3432
tform.transform(event, &mut outputs);
3533
let result = outputs.take_primary();
3634
let output_1 = result.first().unwrap().as_log();
@@ -79,10 +77,8 @@ fn benchmark_remap(c: &mut Criterion) {
7977
});
8078

8179
let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
82-
let mut outputs = TransformOutputsBuf::new_with_capacity(
83-
vec![TransformOutput::new(DataType::all(), HashMap::new())],
84-
1,
85-
);
80+
let mut outputs =
81+
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
8682
tform.transform(event, &mut outputs);
8783
let result = outputs.take_primary();
8884
let output_1 = result.first().unwrap().as_log();
@@ -133,10 +129,8 @@ fn benchmark_remap(c: &mut Criterion) {
133129

134130
let coerce_runner =
135131
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
136-
let mut outputs = TransformOutputsBuf::new_with_capacity(
137-
vec![TransformOutput::new(DataType::all(), HashMap::new())],
138-
1,
139-
);
132+
let mut outputs =
133+
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
140134
tform.transform(event, &mut outputs);
141135
let result = outputs.take_primary();
142136
let output_1 = result.first().unwrap().as_log();

benches/transform/route.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vector::transforms::{
1212
TransformOutputsBuf,
1313
};
1414
use vector_core::{
15-
config::{DataType, TransformOutput},
15+
config::{DataType, Output},
1616
event::{Event, EventContainer, EventMetadata, LogEvent},
1717
transform::{SyncTransform, TransformContext},
1818
};
@@ -54,10 +54,10 @@ fn route(c: &mut Criterion) {
5454
"bba", "bbca", "dba", "bea", "fba", "gba", "hba", "iba", "jba", "bka", "bal", "bma", "bna",
5555
"boa", "bpa", "bqa", "bra", "bsa", "bta", "bua", "bva", "bwa", "xba", "aby", "zba",
5656
] {
57-
outputs.push(TransformOutput {
57+
outputs.push(Output {
5858
port: Some(String::from(name)),
5959
ty: DataType::Log,
60-
log_schema_definitions: Vec::new(),
60+
log_schema_definition: None,
6161
});
6262
}
6363
let output_buffer: TransformOutputsBuf = TransformOutputsBuf::new_with_capacity(outputs, 10);

lib/vector-core/src/config/mod.rs

+20-216
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
use std::{collections::HashMap, fmt, num::NonZeroUsize};
1+
use std::{fmt, num::NonZeroUsize};
22

33
use bitmask_enum::bitmask;
44
use bytes::Bytes;
55
use chrono::{DateTime, Utc};
66

77
mod global_options;
88
mod log_schema;
9-
pub mod output_id;
109
pub mod proxy;
1110

1211
use crate::event::LogEvent;
1312
pub use global_options::GlobalOptions;
1413
pub use log_schema::{init_log_schema, log_schema, LogSchema};
1514
use lookup::{lookup_v2::ValuePath, path, PathPrefix};
16-
pub use output_id::OutputId;
1715
use serde::{Deserialize, Serialize};
1816
use value::Value;
1917
pub use vector_common::config::ComponentKey;
@@ -102,119 +100,42 @@ impl Input {
102100
}
103101

104102
#[derive(Debug, Clone, PartialEq)]
105-
pub struct SourceOutput {
103+
pub struct Output {
106104
pub port: Option<String>,
107105
pub ty: DataType,
108106

109107
// NOTE: schema definitions are only implemented/supported for log-type events. There is no
110108
// inherent blocker to support other types as well, but it'll require additional work to add
111109
// the relevant schemas, and store them separately in this type.
112-
pub schema_definition: Option<schema::Definition>,
113-
}
114-
115-
impl SourceOutput {
116-
/// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
117-
/// Designed for use in log sources.
118110
///
111+
/// The `None` variant of a schema definition has two distinct meanings for a source component
112+
/// versus a transform component:
119113
///
120-
/// # Panics
121-
///
122-
/// Panics if `ty` does not contain [`DataType::Log`].
123-
#[must_use]
124-
pub fn new_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
125-
assert!(ty.contains(DataType::Log));
126-
127-
Self {
128-
port: None,
129-
ty,
130-
schema_definition: Some(schema_definition),
131-
}
132-
}
133-
134-
/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
135-
/// Designed for use in metrics sources.
114+
/// For *sources*, a `None` schema is identical to a `Some(Definition::source_default())`.
136115
///
137-
/// Sets the datatype to be [`DataType::Metric`].
138-
#[must_use]
139-
pub fn new_metrics() -> Self {
140-
Self {
141-
port: None,
142-
ty: DataType::Metric,
143-
schema_definition: None,
144-
}
145-
}
116+
/// For a *transform*, a schema [`schema::Definition`] is required if `Datatype` is Log.
117+
pub log_schema_definition: Option<schema::Definition>,
118+
}
146119

147-
/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
148-
/// Designed for use in trace sources.
120+
impl Output {
121+
/// Create a default `Output` of the given data type.
149122
///
150-
/// Sets the datatype to be [`DataType::Trace`].
151-
#[must_use]
152-
pub fn new_traces() -> Self {
123+
/// A default output is one without a port identifier (i.e. not a named output) and the default
124+
/// output consumers will receive if they declare the component itself as an input.
125+
pub fn default(ty: DataType) -> Self {
153126
Self {
154127
port: None,
155-
ty: DataType::Trace,
156-
schema_definition: None,
128+
ty,
129+
log_schema_definition: None,
157130
}
158131
}
159132

160-
/// Return the schema [`schema::Definition`] from this output.
161-
///
162-
/// Takes a `schema_enabled` flag to determine if the full definition including the fields
163-
/// and associated types should be returned, or if a simple definition should be returned.
164-
/// A simple definition is just the default for the namespace. For the Vector namespace the
165-
/// meanings are included.
166-
/// Schema enabled is set in the users configuration.
167-
#[must_use]
168-
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
169-
self.schema_definition.as_ref().map(|definition| {
170-
if schema_enabled {
171-
definition.clone()
172-
} else {
173-
let mut new_definition =
174-
schema::Definition::default_for_namespace(definition.log_namespaces());
175-
176-
if definition.log_namespaces().contains(&LogNamespace::Vector) {
177-
new_definition.add_meanings(definition.meanings());
178-
}
179-
180-
new_definition
181-
}
182-
})
183-
}
184-
}
185-
186-
impl SourceOutput {
187-
/// Set the port name for this `SourceOutput`.
133+
/// Set the schema definition for this `Output`.
188134
#[must_use]
189-
pub fn with_port(mut self, name: impl Into<String>) -> Self {
190-
self.port = Some(name.into());
135+
pub fn with_schema_definition(mut self, schema_definition: schema::Definition) -> Self {
136+
self.log_schema_definition = Some(schema_definition);
191137
self
192138
}
193-
}
194-
195-
#[derive(Debug, Clone, PartialEq)]
196-
pub struct TransformOutput {
197-
pub port: Option<String>,
198-
pub ty: DataType,
199-
200-
/// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
201-
/// enabled, at least one definition should be output. If the transform
202-
/// has multiple connected sources, it is possible to have multiple output
203-
/// definitions - one for each input.
204-
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
205-
}
206-
207-
impl TransformOutput {
208-
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
209-
/// Designed for use in transforms.
210-
#[must_use]
211-
pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
212-
Self {
213-
port: None,
214-
ty,
215-
log_schema_definitions: schema_definitions,
216-
}
217-
}
218139

219140
/// Set the port name for this `Output`.
220141
#[must_use]
@@ -224,18 +145,6 @@ impl TransformOutput {
224145
}
225146
}
226147

227-
/// Simple utility function that can be used by transforms that make no changes to
228-
/// the schema definitions of events.
229-
/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
230-
pub fn clone_input_definitions(
231-
input_definitions: &[(OutputId, schema::Definition)],
232-
) -> HashMap<OutputId, schema::Definition> {
233-
input_definitions
234-
.iter()
235-
.map(|(output, definition)| (output.clone(), definition.clone()))
236-
.collect()
237-
}
238-
239148
/// Source-specific end-to-end acknowledgements configuration.
240149
///
241150
/// This type exists solely to provide a source-specific description of the `acknowledgements`
@@ -518,12 +427,10 @@ impl LogNamespace {
518427

519428
#[cfg(test)]
520429
mod test {
521-
use super::*;
430+
use crate::config::{init_log_schema, LogNamespace, LogSchema};
522431
use crate::event::LogEvent;
523432
use chrono::Utc;
524-
use lookup::{event_path, owned_value_path, OwnedTargetPath};
525-
use value::Kind;
526-
use vector_common::btreemap;
433+
use lookup::event_path;
527434

528435
#[test]
529436
fn test_insert_standard_vector_source_metadata() {
@@ -539,107 +446,4 @@ mod test {
539446

540447
assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
541448
}
542-
543-
#[test]
544-
fn test_source_definitions_legacy() {
545-
let definition = schema::Definition::empty_legacy_namespace()
546-
.with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
547-
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
548-
let output = SourceOutput::new_logs(DataType::Log, definition);
549-
550-
let valid_event = LogEvent::from(Value::from(btreemap! {
551-
"zork" => "norknoog",
552-
"nork" => 32
553-
}))
554-
.into();
555-
556-
let invalid_event = LogEvent::from(Value::from(btreemap! {
557-
"nork" => 32
558-
}))
559-
.into();
560-
561-
// Get a definition with schema enabled.
562-
let new_definition = output.schema_definition(true).unwrap();
563-
564-
// Meanings should still exist.
565-
assert_eq!(
566-
Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
567-
new_definition.meaning_path("zork")
568-
);
569-
570-
// Events should have the schema validated.
571-
new_definition.assert_valid_for_event(&valid_event);
572-
new_definition.assert_invalid_for_event(&invalid_event);
573-
574-
// There should be the default legacy definition without schemas enabled.
575-
assert_eq!(
576-
Some(schema::Definition::default_legacy_namespace()),
577-
output.schema_definition(false)
578-
);
579-
}
580-
581-
#[test]
582-
fn test_source_definitons_vector() {
583-
let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
584-
.with_metadata_field(
585-
&owned_value_path!("vector", "zork"),
586-
Kind::integer(),
587-
Some("zork"),
588-
)
589-
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
590-
591-
let output = SourceOutput::new_logs(DataType::Log, definition);
592-
593-
let mut valid_event = LogEvent::from(Value::from(btreemap! {
594-
"nork" => 32
595-
}));
596-
597-
valid_event
598-
.metadata_mut()
599-
.value_mut()
600-
.insert(path!("vector").concat("zork"), 32);
601-
602-
let valid_event = valid_event.into();
603-
604-
let mut invalid_event = LogEvent::from(Value::from(btreemap! {
605-
"nork" => 32
606-
}));
607-
608-
invalid_event
609-
.metadata_mut()
610-
.value_mut()
611-
.insert(path!("vector").concat("zork"), "noog");
612-
613-
let invalid_event = invalid_event.into();
614-
615-
// Get a definition with schema enabled.
616-
let new_definition = output.schema_definition(true).unwrap();
617-
618-
// Meanings should still exist.
619-
assert_eq!(
620-
Some(&OwnedTargetPath::metadata(owned_value_path!(
621-
"vector", "zork"
622-
))),
623-
new_definition.meaning_path("zork")
624-
);
625-
626-
// Events should have the schema validated.
627-
new_definition.assert_valid_for_event(&valid_event);
628-
new_definition.assert_invalid_for_event(&invalid_event);
629-
630-
// Get a definition without schema enabled.
631-
let new_definition = output.schema_definition(false).unwrap();
632-
633-
// Meanings should still exist.
634-
assert_eq!(
635-
Some(&OwnedTargetPath::metadata(owned_value_path!(
636-
"vector", "zork"
637-
))),
638-
new_definition.meaning_path("zork")
639-
);
640-
641-
// Events should not have the schema validated.
642-
new_definition.assert_valid_for_event(&valid_event);
643-
new_definition.assert_valid_for_event(&invalid_event);
644-
}
645449
}

0 commit comments

Comments
 (0)