From ed98bc7f7da584f463848af4810cdc8366f0a0d9 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Mon, 19 Aug 2024 12:40:44 +0200 Subject: [PATCH] keep topic field and set it as deprecated --- .../kafka-receiver-topic-per-signal.yaml | 2 +- receiver/kafkareceiver/README.md | 2 + receiver/kafkareceiver/config.go | 5 ++ receiver/kafkareceiver/config_test.go | 71 +++++++++++++++++++ receiver/kafkareceiver/factory.go | 15 +++- receiver/kafkareceiver/testdata/config.yaml | 35 +++++++++ 6 files changed, 126 insertions(+), 4 deletions(-) diff --git a/.chloggen/kafka-receiver-topic-per-signal.yaml b/.chloggen/kafka-receiver-topic-per-signal.yaml index f28136ae97a2..750a22eb8d99 100644 --- a/.chloggen/kafka-receiver-topic-per-signal.yaml +++ b/.chloggen/kafka-receiver-topic-per-signal.yaml @@ -1,7 +1,7 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: breaking +change_type: deprecation # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) component: kafkareceiver diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 242ac0320cd7..5b524aaa2de6 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -27,6 +27,8 @@ The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup +- `topic` The name of the kafka topic to read from. Only one telemetry type may be used for a given topic. + **Warning: this setting is deprecated in favor of the `*_topic` settings to allow configuring one topic per signal.** - `traces_topic` (default = otlp_spans): The name of the kafka topic to read traces from. - `metrics_topic` (default = otlp_metrics): The name of the kafka topic to read metrics from. - `logs_topic` (default = otlp_logs): The name of the kafka topic to read logs from. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 6e3bd97015e8..71736146935f 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -4,6 +4,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( + "fmt" "time" "go.opentelemetry.io/collector/component" @@ -53,6 +54,7 @@ type Config struct { // Heartbeat interval for the Kafka consumer HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"` // The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs) + // Deprecated: use instead "traces_topic", "metrics_topic" and "logs_topic". Topic string `mapstructure:"topic"` // The name of the kafka topic to consume traces from (default "otlp_spans") TracesTopic string `mapstructure:"traces_topic"` @@ -95,5 +97,8 @@ var _ component.Config = (*Config)(nil) // Validate checks the receiver configuration is valid func (cfg *Config) Validate() error { + if cfg.Topic != "" && (cfg.LogsTopic != "" || cfg.MetricsTopic != "" || cfg.TracesTopic != "") { + return fmt.Errorf("setting 'topic' and 'logs_topic'|'metrics_topic'|'traces_topic' is not allowed") + } return nil } diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 855205574511..fc8bfe2da227 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -30,6 +30,69 @@ func TestLoadConfig(t *testing.T) { expected component.Config expectedErr error }{ + { + id: component.NewIDWithName(metadata.Type, "deprecated"), + expected: &Config{ + Topic: "spans", + Encoding: "otlp_proto", + Brokers: []string{"foo:123", "bar:456"}, + ResolveCanonicalBootstrapServersOnly: true, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "latest", + Authentication: kafka.Authentication{ + TLS: &configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + }, + }, + }, + Metadata: kafkaexporter.Metadata{ + Full: true, + Retry: kafkaexporter.MetadataRetry{ + Max: 10, + Backoff: time.Second * 5, + }, + }, + AutoCommit: AutoCommit{ + Enable: true, + Interval: 1 * time.Second, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "deprecated_logs"), + expected: &Config{ + Topic: "logs", + Encoding: "direct", + Brokers: []string{"coffee:123", "foobar:456"}, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "earliest", + Authentication: kafka.Authentication{ + TLS: &configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + }, + }, + }, + Metadata: kafkaexporter.Metadata{ + Full: true, + Retry: kafkaexporter.MetadataRetry{ + Max: 10, + Backoff: time.Second * 5, + }, + }, + AutoCommit: AutoCommit{ + Enable: true, + Interval: 1 * time.Second, + }, + }, + }, { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ @@ -115,3 +178,11 @@ func TestLoadConfig(t *testing.T) { }) } } + +func TestLoadConfigError(t *testing.T) { + config := &Config{ + Topic: "spans", + TracesTopic: "spans1", + } + assert.ErrorContains(t, component.ValidateConfig(config), "setting 'topic' and 'logs_topic'|'metrics_topic'|'traces_topic' is not allowed") +} diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 91f6877d619b..a1b13301a49c 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -140,7 +140,10 @@ func (f *kafkaReceiverFactory) createTracesReceiver( } oCfg := *(cfg.(*Config)) - if oCfg.TracesTopic == "" { + if oCfg.Topic != "" { + set.Logger.Warn("'topic' is deprecated and will be removed in a future release. Please remove it from the configuration.") + oCfg.TracesTopic = oCfg.Topic + } else if oCfg.TracesTopic == "" { oCfg.TracesTopic = defaultTracesTopic } unmarshaler := f.tracesUnmarshalers[oCfg.Encoding] @@ -166,7 +169,10 @@ func (f *kafkaReceiverFactory) createMetricsReceiver( } oCfg := *(cfg.(*Config)) - if oCfg.MetricsTopic == "" { + if oCfg.Topic != "" { + set.Logger.Warn("'topic' is deprecated and will be removed in a future release. Please remove it from the configuration.") + oCfg.MetricsTopic = oCfg.Topic + } else if oCfg.MetricsTopic == "" { oCfg.MetricsTopic = defaultMetricsTopic } unmarshaler := f.metricsUnmarshalers[oCfg.Encoding] @@ -192,7 +198,10 @@ func (f *kafkaReceiverFactory) createLogsReceiver( } oCfg := *(cfg.(*Config)) - if oCfg.LogsTopic == "" { + if oCfg.Topic != "" { + set.Logger.Warn("'topic' is deprecated and will be removed in a future release. Please remove it from the configuration.") + oCfg.LogsTopic = oCfg.Topic + } else if oCfg.LogsTopic == "" { oCfg.LogsTopic = defaultLogsTopic } unmarshaler, err := getLogsUnmarshaler(oCfg.Encoding, f.logsUnmarshalers) diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index d6c4c6a87d5c..fb5f28274cb6 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -1,3 +1,38 @@ +kafka/deprecated: + topic: spans + brokers: + - "foo:123" + - "bar:456" + resolve_canonical_bootstrap_servers_only: true + client_id: otel-collector + group_id: otel-collector + auth: + tls: + ca_file: ca.pem + cert_file: cert.pem + key_file: key.pem + metadata: + retry: + max: 10 + backoff: 5s +kafka/deprecated_logs: + topic: logs + encoding: direct + brokers: + - "coffee:123" + - "foobar:456" + client_id: otel-collector + group_id: otel-collector + initial_offset: earliest + auth: + tls: + ca_file: ca.pem + cert_file: cert.pem + key_file: key.pem + metadata: + retry: + max: 10 + backoff: 5s kafka: traces_topic: spans metrics_topic: metrics