Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove transform type coercion #17411

Merged
merged 5 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions lib/vector-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,6 @@ impl Transform {
Transform::Function(Box::new(v))
}

/// Mutably borrow the inner transform as a function transform.
///
/// # Panics
///
/// If the transform is not a [`FunctionTransform`] this will panic.
pub fn as_function(&mut self) -> &mut Box<dyn FunctionTransform> {
match self {
Transform::Function(t) => t,
_ => panic!(
"Called `Transform::as_function` on something that was not a function variant."
),
}
}

/// Transmute the inner transform into a function transform.
///
/// # Panics
///
/// If the transform is not a [`FunctionTransform`] this will panic.
pub fn into_function(self) -> Box<dyn FunctionTransform> {
match self {
Transform::Function(t) => t,
_ => panic!(
"Called `Transform::into_function` on something that was not a function variant."
),
}
}

/// Create a new synchronous transform.
///
/// This is a broader trait than the simple [`FunctionTransform`] in that it allows transforms
Expand Down Expand Up @@ -104,20 +76,6 @@ impl Transform {
Transform::Task(Box::new(WrapEventTask(v)))
}

/// Mutably borrow the inner transform as a task transform.
///
/// # Panics
///
/// If the transform is a [`FunctionTransform`] this will panic.
pub fn as_task(&mut self) -> &mut Box<dyn TaskTransform<EventArray>> {
match self {
Transform::Task(t) => t,
_ => {
panic!("Called `Transform::as_task` on something that was not a task variant.")
}
}
}

/// Transmute the inner transform into a task transform.
///
/// # Panics
Expand Down
18 changes: 9 additions & 9 deletions src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ use indoc::indoc;
use lookup::lookup_v2::OptionalValuePath;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;
use vector_core::{sink::StreamSink, transform::Transform};
use vector_core::sink::StreamSink;

use super::{
host_key,
logs::{HumioLogsConfig, HOST},
};
use crate::{
config::{
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformConfig,
TransformContext,
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformContext,
},
event::{Event, EventArray, EventContainer},
sinks::{
Expand All @@ -25,7 +24,10 @@ use crate::{
},
template::Template,
tls::TlsConfig,
transforms::{metric_to_log::MetricToLogConfig, OutputBuffer},
transforms::{
metric_to_log::{MetricToLog, MetricToLogConfig},
FunctionTransform, OutputBuffer,
},
};

/// Configuration for the `humio_metrics` sink.
Expand Down Expand Up @@ -153,9 +155,7 @@ impl SinkConfig for HumioMetricsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let transform = self
.transform
.clone()
.build(&TransformContext::new_with_globals(cx.globals.clone()))
.await?;
.build_transform(&TransformContext::new_with_globals(cx.globals.clone()));

let sink = HumioLogsConfig {
token: self.token.clone(),
Expand Down Expand Up @@ -199,7 +199,7 @@ impl SinkConfig for HumioMetricsConfig {

pub struct HumioMetricsSink {
inner: VectorSink,
transform: Transform,
transform: MetricToLog,
}

#[async_trait]
Expand All @@ -210,7 +210,7 @@ impl StreamSink<EventArray> for HumioMetricsSink {
.run(input.map(move |events| {
let mut buf = OutputBuffer::with_capacity(events.len());
for event in events.into_events() {
transform.as_function().transform(&mut buf, event);
transform.transform(&mut buf, event);
}
// Awkward but necessary for the `EventArray` type
let events = buf.into_events().map(Event::into_log).collect::<Vec<_>>();
Expand Down
4 changes: 2 additions & 2 deletions src/sources/kubernetes_logs/parser/cri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ pub mod tests {
fn test_parsing_valid_vector_namespace() {
trace_init();
test_util::test_parser(
|| Transform::function(Cri::new(LogNamespace::Vector)),
|| Cri::new(LogNamespace::Vector),
|bytes| Event::Log(LogEvent::from(value!(bytes))),
valid_cases(LogNamespace::Vector),
);
Expand All @@ -296,7 +296,7 @@ pub mod tests {
fn test_parsing_valid_legacy_namespace() {
trace_init();
test_util::test_parser(
|| Transform::function(Cri::new(LogNamespace::Legacy)),
|| Cri::new(LogNamespace::Legacy),
|bytes| Event::Log(LogEvent::from(bytes)),
valid_cases(LogNamespace::Legacy),
);
Expand Down
12 changes: 4 additions & 8 deletions src/sources/kubernetes_logs/parser/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,8 @@ pub mod tests {
trace_init();

test_util::test_parser(
|| {
Transform::function(Docker {
log_namespace: LogNamespace::Vector,
})
|| Docker {
log_namespace: LogNamespace::Vector,
},
|bytes| Event::Log(LogEvent::from(value!(bytes))),
valid_cases(LogNamespace::Vector),
Expand All @@ -330,10 +328,8 @@ pub mod tests {
trace_init();

test_util::test_parser(
|| {
Transform::function(Docker {
log_namespace: LogNamespace::Legacy,
})
|| Docker {
log_namespace: LogNamespace::Legacy,
},
|bytes| Event::Log(LogEvent::from(bytes)),
valid_cases(LogNamespace::Legacy),
Expand Down
4 changes: 2 additions & 2 deletions src/sources/kubernetes_logs/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ mod tests {
fn test_parsing_valid_vector_namespace() {
trace_init();
test_util::test_parser(
|| Transform::function(Parser::new(LogNamespace::Vector)),
|| Parser::new(LogNamespace::Vector),
|bytes| Event::Log(LogEvent::from(value!(bytes))),
valid_cases(LogNamespace::Vector),
);
Expand All @@ -117,7 +117,7 @@ mod tests {
fn test_parsing_valid_legacy_namespace() {
trace_init();
test_util::test_parser(
|| Transform::function(Parser::new(LogNamespace::Legacy)),
|| Parser::new(LogNamespace::Legacy),
|bytes| Event::Log(LogEvent::from(bytes)),
valid_cases(LogNamespace::Legacy),
);
Expand Down
8 changes: 4 additions & 4 deletions src/sources/kubernetes_logs/parser/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vrl::value::{value, Value};
use crate::{
event::{Event, LogEvent},
sources::kubernetes_logs::Config,
transforms::{OutputBuffer, Transform},
transforms::{OutputBuffer, FunctionTransform},
};

/// Build a log event for test purposes.
Expand Down Expand Up @@ -58,15 +58,15 @@ pub fn make_log_event(
/// Shared logic for testing parsers.
///
/// Takes a parser builder and a list of test cases.
pub fn test_parser<B, L, S>(builder: B, loader: L, cases: Vec<(S, Vec<Event>)>)
pub fn test_parser<B, L, S, F>(builder: B, loader: L, cases: Vec<(S, Vec<Event>)>)
where
B: Fn() -> Transform,
B: Fn() -> F,
F: FunctionTransform,
L: Fn(S) -> Event,
{
for (message, expected) in cases {
let input = loader(message);
let mut parser = (builder)();
let parser = parser.as_function();
let mut output = OutputBuffer::default();
parser.transform(&mut output, input);

Expand Down
18 changes: 12 additions & 6 deletions src/transforms/metric_to_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ pub struct MetricToLogConfig {
pub metric_tag_values: MetricTagValues,
}

impl MetricToLogConfig {
pub fn build_transform(&self, context: &TransformContext) -> MetricToLog {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably the trait method should have been build_transform, but that's a much more invasive change.

MetricToLog::new(
self.host_tag.as_deref(),
self.timezone.unwrap_or_else(|| context.globals.timezone()),
context.log_namespace(self.log_namespace),
self.metric_tag_values,
)
}
}

impl GenerateConfig for MetricToLogConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
Expand All @@ -79,12 +90,7 @@ impl GenerateConfig for MetricToLogConfig {
#[typetag::serde(name = "metric_to_log")]
impl TransformConfig for MetricToLogConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Ok(Transform::function(MetricToLog::new(
self.host_tag.as_deref(),
self.timezone.unwrap_or_else(|| context.globals.timezone()),
context.log_namespace(self.log_namespace),
self.metric_tag_values,
)))
Ok(Transform::function(self.build_transform(context)))
}

fn input(&self) -> Input {
Expand Down