Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka sink): Fix Kafka partition key on metric events #20246

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ pub struct KafkaSinkConfig {
))]
pub topic: Template,

/// The log field name or tag key to use for the topic key.
///
/// If the field does not exist in the log or in the tags, a blank value is used. If
/// unspecified, the key is not sent.
/// A path expression to a field of the event to use for the topic key.
///
/// If unspecified, or if the field does not exist in the log or metric, the key is not sent.
/// Kafka uses a hash of the key to choose the partition or uses round-robin if the record has
/// no key.
///
/// Metrics currently only support access to `name` and `tags`.
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "user_id"))]
#[configurable(metadata(docs::examples = ".my_topic"))]
Expand Down
107 changes: 102 additions & 5 deletions src/sinks/kafka/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::Bytes;
use rdkafka::message::{Header, OwnedHeaders};
use vector_lib::lookup::OwnedTargetPath;
use vrl::path::{OwnedSegment, PathPrefix};

use crate::{
internal_events::KafkaHeaderExtractionError,
Expand Down Expand Up @@ -67,14 +68,35 @@ impl RequestBuilder<(String, Event)> for KafkaRequestBuilder {
fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option<Bytes> {
key_field.and_then(|key_field| match event {
Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()),
Event::Metric(metric) => metric
.tags()
.and_then(|tags| tags.get(key_field.to_string().as_str()))
.map(|value| value.to_owned().into()),
Event::Metric(metric) => metric_get(metric, key_field).map(|value| value.to_owned().into()),
_ => None,
})
}

// A version of this logic should be moved into "Metric" as "get" analogous to
// "LogEvent" when metrics can be interpreted as "Value"s.
pub fn metric_get<'a>(metric: &'a vector_lib::event::metric::Metric, key: &OwnedTargetPath) -> Option<&'a str> {
match key.prefix {
PathPrefix::Event =>
match key.path.segments.get(0) {
Some(OwnedSegment::Field(first_field)) =>
match first_field.as_ref() {
"name" => Some(metric.name()),
"tags" => match key.path.segments.len() {
2 => match key.path.segments.get(1) {
Some(OwnedSegment::Field(second_field)) => metric.tags().as_ref().and_then(|tags| tags.get(second_field.as_ref())),
_ => None,
}
_ => None,
}
_ => metric.tags().as_ref().and_then(|tags| tags.get(key.to_string().as_str())),
}
_ => None,
}
_ => None,
}
}

fn get_timestamp_millis(event: &Event) -> Option<i64> {
match &event {
Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
Expand Down Expand Up @@ -122,8 +144,11 @@ mod tests {
use bytes::Bytes;
use rdkafka::message::Headers;

use chrono::{offset::TimeZone, DateTime, Timelike, Utc};
use similar_asserts::assert_eq;

use super::*;
use crate::event::{LogEvent, ObjectMap};
use crate::event::{LogEvent, Metric, MetricKind, MetricTags, MetricValue, ObjectMap};

#[test]
fn kafka_get_headers() {
Expand All @@ -141,4 +166,76 @@ mod tests {
assert_eq!(headers.get(1).key, "b-key");
assert_eq!(headers.get(1).value.unwrap(), "b-value".as_bytes());
}

fn ts() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp")
}

fn tags() -> MetricTags {
metric_tags!(
"normal_tag" => "value1",
".workaround" => "value2",
)
}

fn metric() -> Metric {
Metric::new(
"test_name",
MetricKind::Incremental,
MetricValue::Counter { value: 2.0 },
)
.with_namespace(Some("test_namespace"))
.with_tags(Some(tags()))
.with_timestamp(Some(ts()))
}

#[test]
fn kafka_get_key_from_metric_with_workaround() {
// Confirm direct reference to dot-prefixed tag names does not break
let event = Event::Metric(metric());
let key_field = OwnedTargetPath::try_from(".workaround".to_string()).unwrap();
let key_value = get_key(&event, Some(&key_field));

assert_eq!(key_value.unwrap().as_ref(), "value2".as_bytes());
}

#[test]
fn kafka_get_key_from_metric_from_name() {
let event = Event::Metric(metric());
let key_field = OwnedTargetPath::try_from(".name".to_string()).unwrap();
let key_value = get_key(&event, Some(&key_field));

assert_eq!(key_value.unwrap().as_ref(), "test_name".as_bytes());
}

#[test]
fn kafka_get_key_from_metric_from_normal_tag() {
let event = Event::Metric(metric());
let key_field = OwnedTargetPath::try_from(".tags.normal_tag".to_string()).unwrap();
let key_value = get_key(&event, Some(&key_field));

assert_eq!(key_value.unwrap().as_ref(), "value1".as_bytes());
}

#[test]
fn kafka_get_key_from_metric_from_missing_tag() {
let event = Event::Metric(metric());
let key_field = OwnedTargetPath::try_from(".tags.missing_tag".to_string()).unwrap();
let key_value = get_key(&event, Some(&key_field));

assert_eq!(key_value, None);
}

#[test]
fn kafka_get_key_from_metric_from_workaround_tag() {
// Also test that explicitly referencing a workaround tag works as expected
let event = Event::Metric(metric());
let key_field = OwnedTargetPath::try_from(".tags.\".workaround\"".to_string()).unwrap();
let key_value = get_key(&event, Some(&key_field));

assert_eq!(key_value.unwrap().as_ref(), "value2".as_bytes());
}
}
8 changes: 4 additions & 4 deletions website/cue/reference/components/sinks/base/kafka.cue
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,13 @@ base: components: sinks: kafka: configuration: {
}
key_field: {
description: """
The log field name or tag key to use for the topic key.

If the field does not exist in the log or in the tags, a blank value is used. If
unspecified, the key is not sent.
A path expression to a field of the event to use for the topic key.

If unspecified, or if the field does not exist in the log or metric, the key is not sent.
Kafka uses a hash of the key to choose the partition or uses round-robin if the record has
no key.

Metrics currently only support access to `name` and `tags`.
"""
required: false
type: string: examples: ["user_id", ".my_topic", "%my_topic"]
Expand Down
Loading