Skip to content

Commit 8916ec1

Browse files
committed
Revert "chore(topology): Transform outputs hash table of OutputId -> Definition (#17059)"
This reverts commit 1bdb24d.
1 parent ba63e21 commit 8916ec1

File tree

24 files changed

+212
-283
lines changed

24 files changed

+212
-283
lines changed

benches/remap.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fn benchmark_remap(c: &mut Criterion) {
2828

2929
let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
3030
let mut outputs = TransformOutputsBuf::new_with_capacity(
31-
vec![TransformOutput::new(DataType::all(), HashMap::new())],
31+
vec![TransformOutput::new(DataType::all(), vec![])],
3232
1,
3333
);
3434
tform.transform(event, &mut outputs);
@@ -80,7 +80,7 @@ fn benchmark_remap(c: &mut Criterion) {
8080

8181
let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
8282
let mut outputs = TransformOutputsBuf::new_with_capacity(
83-
vec![TransformOutput::new(DataType::all(), HashMap::new())],
83+
vec![TransformOutput::new(DataType::all(), vec![])],
8484
1,
8585
);
8686
tform.transform(event, &mut outputs);
@@ -134,7 +134,7 @@ fn benchmark_remap(c: &mut Criterion) {
134134
let coerce_runner =
135135
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
136136
let mut outputs = TransformOutputsBuf::new_with_capacity(
137-
vec![TransformOutput::new(DataType::all(), HashMap::new())],
137+
vec![TransformOutput::new(DataType::all(), vec![])],
138138
1,
139139
);
140140
tform.transform(event, &mut outputs);

lib/vector-core/src/config/mod.rs

+3-17
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
use std::{collections::HashMap, fmt, num::NonZeroUsize};
1+
use std::{fmt, num::NonZeroUsize};
22

33
use bitmask_enum::bitmask;
44
use bytes::Bytes;
55
use chrono::{DateTime, Utc};
66

77
mod global_options;
88
mod log_schema;
9-
pub mod output_id;
109
pub mod proxy;
1110

1211
use crate::event::LogEvent;
1312
pub use global_options::GlobalOptions;
1413
pub use log_schema::{init_log_schema, log_schema, LogSchema};
1514
use lookup::{lookup_v2::ValuePath, path, PathPrefix};
16-
pub use output_id::OutputId;
1715
use serde::{Deserialize, Serialize};
1816
use value::Value;
1917
pub use vector_common::config::ComponentKey;
@@ -201,14 +199,14 @@ pub struct TransformOutput {
201199
/// enabled, at least one definition should be output. If the transform
202200
/// has multiple connected sources, it is possible to have multiple output
203201
/// definitions - one for each input.
204-
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
202+
pub log_schema_definitions: Vec<schema::Definition>,
205203
}
206204

207205
impl TransformOutput {
208206
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
209207
/// Designed for use in transforms.
210208
#[must_use]
211-
pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
209+
pub fn new(ty: DataType, schema_definitions: Vec<schema::Definition>) -> Self {
212210
Self {
213211
port: None,
214212
ty,
@@ -224,18 +222,6 @@ impl TransformOutput {
224222
}
225223
}
226224

227-
/// Simple utility function that can be used by transforms that make no changes to
228-
/// the schema definitions of events.
229-
/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
230-
pub fn clone_input_definitions(
231-
input_definitions: &[(OutputId, schema::Definition)],
232-
) -> HashMap<OutputId, schema::Definition> {
233-
input_definitions
234-
.iter()
235-
.map(|(output, definition)| (output.clone(), definition.clone()))
236-
.collect()
237-
}
238-
239225
/// Source-specific end-to-end acknowledgements configuration.
240226
///
241227
/// This type exists solely to provide a source-specific description of the `acknowledgements`

lib/vector-core/src/config/output_id.rs

-90
This file was deleted.

src/config/graph.rs

+7-10
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ mod test {
399399
in_ty,
400400
outputs: vec![TransformOutput::new(
401401
out_ty,
402-
[("test".into(), Definition::default_legacy_namespace())].into(),
402+
vec![Definition::default_legacy_namespace()],
403403
)],
404404
},
405405
);
@@ -415,11 +415,8 @@ mod test {
415415
let id = id.into();
416416
match self.nodes.get_mut(&id) {
417417
Some(Node::Transform { outputs, .. }) => outputs.push(
418-
TransformOutput::new(
419-
ty,
420-
[("test".into(), Definition::default_legacy_namespace())].into(),
421-
)
422-
.with_port(name),
418+
TransformOutput::new(ty, vec![Definition::default_legacy_namespace()])
419+
.with_port(name),
423420
),
424421
_ => panic!("invalid transform"),
425422
}
@@ -654,11 +651,11 @@ mod test {
654651
outputs: vec![
655652
TransformOutput::new(
656653
DataType::all(),
657-
[("test".into(), Definition::default_legacy_namespace())].into(),
654+
vec![Definition::default_legacy_namespace()],
658655
),
659656
TransformOutput::new(
660657
DataType::all(),
661-
[("test".into(), Definition::default_legacy_namespace())].into(),
658+
vec![Definition::default_legacy_namespace()],
662659
)
663660
.with_port("bar"),
664661
],
@@ -679,11 +676,11 @@ mod test {
679676
outputs: vec![
680677
TransformOutput::new(
681678
DataType::all(),
682-
[("test".into(), Definition::default_legacy_namespace())].into(),
679+
vec![Definition::default_legacy_namespace()],
683680
),
684681
TransformOutput::new(
685682
DataType::all(),
686-
[("test".into(), Definition::default_legacy_namespace())].into(),
683+
vec![Definition::default_legacy_namespace()],
687684
)
688685
.with_port("errors"),
689686
],

src/config/id.rs

+88-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
use std::ops::Deref;
1+
use std::{fmt, ops::Deref};
22

33
use vector_config::configurable_component;
44
pub use vector_core::config::ComponentKey;
55

6+
use super::schema;
7+
68
/// A list of upstream [source][sources] or [transform][transforms] IDs.
79
///
810
/// Wildcards (`*`) are supported.
@@ -94,3 +96,88 @@ impl<T> From<Vec<T>> for Inputs<T> {
9496
Self(inputs)
9597
}
9698
}
99+
100+
/// Component output identifier.
101+
#[configurable_component]
102+
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
103+
pub struct OutputId {
104+
/// The component to which the output belongs.
105+
pub component: ComponentKey,
106+
107+
/// The output port name, if not the default.
108+
pub port: Option<String>,
109+
}
110+
111+
impl OutputId {
112+
/// Some situations, for example when validating a config file requires running the
113+
/// transforms::output function to retrieve the outputs, but we don't have an
114+
/// `OutputId` from a source. This gives us an `OutputId` that we can use.
115+
///
116+
/// TODO: This is not a pleasant solution, but would require some significant refactoring
117+
/// to the topology code to avoid.
118+
pub fn dummy() -> Self {
119+
Self {
120+
component: "dummy".into(),
121+
port: None,
122+
}
123+
}
124+
125+
/// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of
126+
/// this `OutputId` with each `Definition`.
127+
pub fn with_definitions(
128+
&self,
129+
definitions: impl IntoIterator<Item = schema::Definition>,
130+
) -> Vec<(OutputId, schema::Definition)> {
131+
definitions
132+
.into_iter()
133+
.map(|definition| (self.clone(), definition))
134+
.collect()
135+
}
136+
}
137+
138+
impl fmt::Display for OutputId {
139+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140+
match &self.port {
141+
None => self.component.fmt(f),
142+
Some(port) => write!(f, "{}.{}", self.component, port),
143+
}
144+
}
145+
}
146+
147+
impl From<ComponentKey> for OutputId {
148+
fn from(key: ComponentKey) -> Self {
149+
Self {
150+
component: key,
151+
port: None,
152+
}
153+
}
154+
}
155+
156+
impl From<&ComponentKey> for OutputId {
157+
fn from(key: &ComponentKey) -> Self {
158+
Self::from(key.clone())
159+
}
160+
}
161+
162+
impl From<(&ComponentKey, String)> for OutputId {
163+
fn from((key, name): (&ComponentKey, String)) -> Self {
164+
Self {
165+
component: key.clone(),
166+
port: Some(name),
167+
}
168+
}
169+
}
170+
171+
// This panicking implementation is convenient for testing, but should never be enabled for use
172+
// outside of tests.
173+
#[cfg(test)]
174+
impl From<&str> for OutputId {
175+
fn from(s: &str) -> Self {
176+
assert!(
177+
!s.contains('.'),
178+
"Cannot convert dotted paths to strings without more context"
179+
);
180+
let component = ComponentKey::from(s);
181+
component.into()
182+
}
183+
}

src/config/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use cmd::{cmd, Opts};
4545
pub use diff::ConfigDiff;
4646
pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
4747
pub use format::{Format, FormatHint};
48-
pub use id::{ComponentKey, Inputs};
48+
pub use id::{ComponentKey, Inputs, OutputId};
4949
pub use loading::{
5050
load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets,
5151
load_from_str, load_source_from_paths, merge_path_lists, process_paths, CONFIG_PATHS,
@@ -57,9 +57,7 @@ pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
5757
pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter};
5858
pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
5959
pub use validation::warnings;
60-
pub use vector_core::config::{
61-
init_log_schema, log_schema, proxy::ProxyConfig, LogSchema, OutputId,
62-
};
60+
pub use vector_core::config::{init_log_schema, log_schema, proxy::ProxyConfig, LogSchema};
6361

6462
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
6563
pub enum ConfigPath {

src/config/transform.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub struct TransformContext {
111111
///
112112
/// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
113113
/// that `TransformOutput`.
114-
pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
114+
pub schema_definitions: HashMap<Option<String>, Vec<schema::Definition>>,
115115

116116
/// The schema definition created by merging all inputs of the transform.
117117
///
@@ -129,7 +129,7 @@ impl Default for TransformContext {
129129
key: Default::default(),
130130
globals: Default::default(),
131131
enrichment_tables: Default::default(),
132-
schema_definitions: HashMap::from([(None, HashMap::new())]),
132+
schema_definitions: HashMap::from([(None, vec![schema::Definition::any()])]),
133133
merged_schema_definition: schema::Definition::any(),
134134
schema: SchemaOptions::default(),
135135
}
@@ -148,9 +148,7 @@ impl TransformContext {
148148
}
149149

150150
#[cfg(any(test, feature = "test"))]
151-
pub fn new_test(
152-
schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
153-
) -> Self {
151+
pub fn new_test(schema_definitions: HashMap<Option<String>, Vec<schema::Definition>>) -> Self {
154152
Self {
155153
schema_definitions,
156154
..Default::default()

src/test_util/mock/transforms/basic.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl TransformConfig for BasicTransformConfig {
5858
DataType::all(),
5959
definitions
6060
.iter()
61-
.map(|(output, definition)| (output.clone(), definition.clone()))
61+
.map(|(_output, definition)| definition.clone())
6262
.collect(),
6363
)]
6464
}

src/test_util/mock/transforms/noop.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl TransformConfig for NoopTransformConfig {
4848
DataType::all(),
4949
definitions
5050
.iter()
51-
.map(|(output, definition)| (output.clone(), definition.clone()))
51+
.map(|(_output, definition)| definition.clone())
5252
.collect(),
5353
)]
5454
}

0 commit comments

Comments
 (0)