Skip to content

Commit

Permalink
Enable kafka zstd compression and idempotent writes (influxdata#8435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Nov 23, 2020
1 parent 39a489a commit 17cc362
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 152 deletions.
94 changes: 94 additions & 0 deletions plugins/common/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kafka

import (
"log"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/plugins/common/tls"
)

// ReadConfig for kafka clients meaning to read from Kafka.
type ReadConfig struct {
Config
}

// SetConfig on the sarama.Config object from the ReadConfig struct.
func (k *ReadConfig) SetConfig(config *sarama.Config) error {
config.Consumer.Return.Errors = true

return k.Config.SetConfig(config)
}

// WriteConfig for kafka clients meaning to write to kafka
type WriteConfig struct {
Config

RequiredAcks int `toml:"required_acks"`
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`
IdempotentWrites bool `toml:"idempotent_writes"`
}

// SetConfig on the sarama.Config object from the WriteConfig struct.
func (k *WriteConfig) SetConfig(config *sarama.Config) error {
config.Producer.Return.Successes = true
config.Producer.Idempotent = k.IdempotentWrites
config.Producer.Retry.Max = k.MaxRetry
if k.MaxMessageBytes > 0 {
config.Producer.MaxMessageBytes = k.MaxMessageBytes
}
config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
return k.Config.SetConfig(config)
}

// Config common to all Kafka clients.
type Config struct {
SASLAuth
tls.ClientConfig

Version string `toml:"version"`
ClientID string `toml:"client_id"`
CompressionCodec int `toml:"compression_codec"`

// EnableTLS deprecated
EnableTLS *bool `toml:"enable_tls"`
}

// SetConfig on the sarama.Config object from the Config struct.
func (k *Config) SetConfig(config *sarama.Config) error {
if k.EnableTLS != nil {
log.Printf("W! [kafka] enable_tls is deprecated, and the setting does nothing, you can safely remove it from the config")
}
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}

config.Version = version
}

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}

config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)

tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}

if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

if err := k.SetSASLConfig(config); err != nil {
return err
}

return nil
}
10 changes: 9 additions & 1 deletion plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ and use the old zookeeper connection method.
# insecure_skip_verify = false

## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"

Expand All @@ -62,6 +62,14 @@ and use the old zookeeper connection method.
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"

## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"

Expand Down
58 changes: 12 additions & 46 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
Expand All @@ -36,15 +35,14 @@ const sampleConfig = `
# version = ""
## Optional TLS Config
# enable_tls = true
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
Expand All @@ -71,6 +69,15 @@ const sampleConfig = `
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
Expand Down Expand Up @@ -110,20 +117,15 @@ type semaphore chan empty

type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
ClientID string `toml:"client_id"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicTag string `toml:"topic_tag"`
Version string `toml:"version"`

kafka.SASLAuth

EnableTLS *bool `toml:"enable_tls"`
tls.ClientConfig
kafka.ReadConfig

Log telegraf.Logger `toml:"-"`

Expand Down Expand Up @@ -173,50 +175,14 @@ func (k *KafkaConsumer) Init() error {
}

config := sarama.NewConfig()
config.Consumer.Return.Errors = true

// Kafka version 0.10.2.0 is required for consumer groups.
config.Version = sarama.V0_10_2_0

if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return err
}

config.Version = version
}

if k.EnableTLS != nil && *k.EnableTLS {
config.Net.TLS.Enable = true
}

tlsConfig, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}

if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig

// To maintain backwards compatibility, if the enable_tls option is not
// set TLS is enabled if a non-default TLS config is used.
if k.EnableTLS == nil {
k.Log.Warnf("Use of deprecated configuration: enable_tls should be set when using TLS")
config.Net.TLS.Enable = true
}
}

if err := k.SetSASLConfig(config); err != nil {
if err := k.SetConfig(config); err != nil {
return err
}

if k.ClientID != "" {
config.ClientID = k.ClientID
} else {
config.ClientID = "Telegraf"
}

switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Consumer.Offsets.Initial = sarama.OffsetOldest
Expand Down
54 changes: 32 additions & 22 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -68,8 +69,12 @@ func TestInit(t *testing.T) {
{
name: "parses valid version string",
plugin: &KafkaConsumer{
Version: "1.0.0",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
Version: "1.0.0",
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.Version, sarama.V1_0_0_0)
Expand All @@ -78,16 +83,24 @@ func TestInit(t *testing.T) {
{
name: "invalid version string",
plugin: &KafkaConsumer{
Version: "100",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
Version: "100",
},
},
Log: testutil.Logger{},
},
initError: true,
},
{
name: "custom client_id",
plugin: &KafkaConsumer{
ClientID: "custom",
Log: testutil.Logger{},
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientID: "custom",
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.Equal(t, plugin.config.ClientID, "custom")
Expand Down Expand Up @@ -123,8 +136,12 @@ func TestInit(t *testing.T) {
{
name: "default tls with a tls config",
plugin: &KafkaConsumer{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
},
},
},
Log: testutil.Logger{},
},
Expand All @@ -133,24 +150,17 @@ func TestInit(t *testing.T) {
},
},
{
name: "disable tls",
name: "Insecure tls",
plugin: &KafkaConsumer{
EnableTLS: func() *bool { v := false; return &v }(),
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
ReadConfig: kafka.ReadConfig{
Config: kafka.Config{
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: true,
},
},
},
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.False(t, plugin.config.Net.TLS.Enable)
},
},
{
name: "enable tls",
plugin: &KafkaConsumer{
EnableTLS: func() *bool { v := true; return &v }(),
Log: testutil.Logger{},
},
check: func(t *testing.T, plugin *KafkaConsumer) {
require.True(t, plugin.config.Net.TLS.Enable)
},
Expand Down
17 changes: 11 additions & 6 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,18 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## routing_key = "telegraf"
# routing_key = ""

## CompressionCodec represents the various compression codecs recognized by
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
## 3 : LZ4 compression
# compression_codec = 0
## 0 : None
## 1 : Gzip
## 2 : Snappy
## 3 : LZ4
## 4 : ZSTD
# compression_codec = 0

## Idempotent Writes
## If enabled, exactly one copy of each message is written.
# idempotent_writes = false

## RequiredAcks is used in Produce Requests to tell the broker how many
## replica acknowledgements it must see before responding
Expand Down
Loading

0 comments on commit 17cc362

Please sign in to comment.