From ee4a7f889449a557081a2529da5c9265c9d015fc Mon Sep 17 00:00:00 2001 From: 0xC3 Date: Thu, 2 Dec 2021 20:55:54 +0800 Subject: [PATCH] sink(ticdc): add tests for enable-tidb-extension and extract the config out (#3693) --- cdc/sink/mq.go | 2 +- cdc/sink/producer/kafka/config.go | 297 ++++++++++++++++++++++++ cdc/sink/producer/kafka/config_test.go | 193 ++++++++++++++++ cdc/sink/producer/kafka/kafka.go | 300 ++----------------------- cdc/sink/producer/kafka/kafka_test.go | 118 ---------- pkg/errors/errors.go | 2 +- 6 files changed, 512 insertions(+), 400 deletions(-) create mode 100644 cdc/sink/producer/kafka/config.go create mode 100644 cdc/sink/producer/kafka/config_test.go diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index e6ffdafbda8..102e88e5f72 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -380,7 +380,7 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { config := kafka.NewConfig() - if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil { + if err := config.CompleteByOpts(sinkURI, replicaConfig, opts); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/cdc/sink/producer/kafka/config.go b/cdc/sink/producer/kafka/config.go new file mode 100644 index 00000000000..0a530dfb9de --- /dev/null +++ b/cdc/sink/producer/kafka/config.go @@ -0,0 +1,297 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/util" + "go.uber.org/zap" +) + +func init() { + sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB +} + +// Config stores user specified Kafka producer configuration +type Config struct { + BrokerEndpoints []string + PartitionNum int32 + + // User should make sure that `replication-factor` not greater than the number of kafka brokers. + ReplicationFactor int16 + + Version string + MaxMessageBytes int + Compression string + ClientID string + Credential *security.Credential + SaslScram *security.SaslScram + // control whether to create topic + AutoCreate bool +} + +// NewConfig returns a default Kafka configuration +func NewConfig() *Config { + return &Config{ + Version: "2.4.0", + // MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker. + MaxMessageBytes: 1 * 1024 * 1024, + ReplicationFactor: 1, + Compression: "none", + Credential: &security.Credential{}, + SaslScram: &security.SaslScram{}, + AutoCreate: true, + } +} + +// CompleteByOpts the kafka producer configuration. +func (c *Config) CompleteByOpts(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") + params := sinkURI.Query() + s := params.Get("partition-num") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.PartitionNum = int32(a) + if c.PartitionNum <= 0 { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(c.PartitionNum) + } + } + + s = params.Get("replication-factor") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.ReplicationFactor = int16(a) + } + + s = params.Get("kafka-version") + if s != "" { + c.Version = s + } + + s = params.Get("max-message-bytes") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.MaxMessageBytes = a + opts["max-message-bytes"] = s + } + + s = params.Get("max-batch-size") + if s != "" { + opts["max-batch-size"] = s + } + + s = params.Get("compression") + if s != "" { + c.Compression = s + } + + c.ClientID = params.Get("kafka-client-id") + + s = params.Get("ca") + if s != "" { + c.Credential.CAPath = s + } + + s = params.Get("cert") + if s != "" { + c.Credential.CertPath = s + } + + s = params.Get("key") + if s != "" { + c.Credential.KeyPath = s + } + + s = params.Get("sasl-user") + if s != "" { + c.SaslScram.SaslUser = s + } + + s = params.Get("sasl-password") + if s != "" { + c.SaslScram.SaslPassword = s + } + + s = params.Get("sasl-mechanism") + if s != "" { + c.SaslScram.SaslMechanism = s + } + + s = params.Get("auto-create-topic") + if s != "" { + autoCreate, err := strconv.ParseBool(s) + if err != nil { + return err + } + c.AutoCreate = autoCreate + } + + s = params.Get("protocol") + if s != "" { + replicaConfig.Sink.Protocol = s + } + + s = params.Get("enable-tidb-extension") + if s != "" { + _, err := strconv.ParseBool(s) + if err != nil { + return err + } + if replicaConfig.Sink.Protocol != "canal-json" { + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("enable-tidb-extension only support canal-json protocol")) + } + opts["enable-tidb-extension"] = s + } + + return nil +} + +// set the partition-num by the topic's partition count. +func (c *Config) setPartitionNum(realPartitionCount int32) error { + // user does not specify the `partition-num` in the sink-uri + if c.PartitionNum == 0 { + c.PartitionNum = realPartitionCount + return nil + } + + if c.PartitionNum < realPartitionCount { + log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ + "Some partitions will not have messages dispatched to", + zap.Int32("sink-uri partitions", c.PartitionNum), + zap.Int32("topic partitions", realPartitionCount)) + return nil + } + + // Make sure that the user-specified `partition-num` is not greater than + // the real partition count, since messages would be dispatched to different + // partitions, this could prevent potential correctness problems. + if c.PartitionNum > realPartitionCount { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( + "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", + c.PartitionNum, realPartitionCount) + } + return nil +} + +// newSaramaConfig return the default config and set the according version and metrics +func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { + config := sarama.NewConfig() + + version, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) + } + var role string + if util.IsOwnerFromCtx(ctx) { + role = "owner" + } else { + role = "processor" + } + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + + config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) + if err != nil { + return nil, errors.Trace(err) + } + config.Version = version + // See: https://kafka.apache.org/documentation/#replication + // When one of the brokers in a Kafka cluster is down, the partition leaders + // in this broker is broken, Kafka will election a new partition leader and + // replication logs, this process will last from a few seconds to a few minutes. + // Kafka cluster will not provide a writing service in this process. + // Time out in one minute. + config.Metadata.Retry.Max = 120 + config.Metadata.Retry.Backoff = 500 * time.Millisecond + // If it is not set, this means a metadata request against an unreachable + // cluster (all brokers are unreachable or unresponsive) can take up to + // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + + // Metadata.Retry.Backoff * Metadata.Retry.Max` + // to fail. + // See: https://github.com/Shopify/sarama/issues/765 + // and https://github.com/pingcap/ticdc/issues/3352. + config.Metadata.Timeout = 1 * time.Minute + + config.Producer.Partitioner = sarama.NewManualPartitioner + config.Producer.MaxMessageBytes = c.MaxMessageBytes + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = sarama.WaitForAll + // Time out in five minutes(600 * 500ms). + config.Producer.Retry.Max = 600 + config.Producer.Retry.Backoff = 500 * time.Millisecond + switch strings.ToLower(strings.TrimSpace(c.Compression)) { + case "none": + config.Producer.Compression = sarama.CompressionNone + case "gzip": + config.Producer.Compression = sarama.CompressionGZIP + case "snappy": + config.Producer.Compression = sarama.CompressionSnappy + case "lz4": + config.Producer.Compression = sarama.CompressionLZ4 + case "zstd": + config.Producer.Compression = sarama.CompressionZSTD + default: + log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression)) + config.Producer.Compression = sarama.CompressionNone + } + + // Time out in one minute(120 * 500ms). + config.Admin.Retry.Max = 120 + config.Admin.Retry.Backoff = 500 * time.Millisecond + config.Admin.Timeout = 1 * time.Minute + + if c.Credential != nil && len(c.Credential.CAPath) != 0 { + config.Net.TLS.Enable = true + config.Net.TLS.Config, err = c.Credential.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + } + if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 { + config.Net.SASL.Enable = true + config.Net.SASL.User = c.SaslScram.SaslUser + config.Net.SASL.Password = c.SaslScram.SaslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism) + if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} } + } else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} } + } else { + return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512") + } + } + + return config, err +} diff --git a/cdc/sink/producer/kafka/config_test.go b/cdc/sink/producer/kafka/config_test.go new file mode 100644 index 00000000000..40e0276e9ff --- /dev/null +++ b/cdc/sink/producer/kafka/config_test.go @@ -0,0 +1,193 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + "net/url" + + "github.com/Shopify/sarama" + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/pkg/util/testleak" +) + +func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { + defer testleak.AfterTest(c)() + ctx := context.Background() + config := NewConfig() + config.Version = "invalid" + _, err := newSaramaConfigImpl(ctx, config) + c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") + + ctx = util.SetOwnerInCtx(ctx) + config.Version = "2.6.0" + config.ClientID = "^invalid$" + _, err = newSaramaConfigImpl(ctx, config) + c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) + + config.ClientID = "test-kafka-client" + compressionCases := []struct { + algorithm string + expected sarama.CompressionCodec + }{ + {"none", sarama.CompressionNone}, + {"gzip", sarama.CompressionGZIP}, + {"snappy", sarama.CompressionSnappy}, + {"lz4", sarama.CompressionLZ4}, + {"zstd", sarama.CompressionZSTD}, + {"others", sarama.CompressionNone}, + } + for _, cc := range compressionCases { + config.Compression = cc.algorithm + cfg, err := newSaramaConfigImpl(ctx, config) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) + } + + config.Credential = &security.Credential{ + CAPath: "/invalid/ca/path", + } + _, err = newSaramaConfigImpl(ctx, config) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") + + saslConfig := NewConfig() + saslConfig.Version = "2.6.0" + saslConfig.ClientID = "test-sasl-scram" + saslConfig.SaslScram = &security.SaslScram{ + SaslUser: "user", + SaslPassword: "password", + SaslMechanism: sarama.SASLTypeSCRAMSHA256, + } + + cfg, err := newSaramaConfigImpl(ctx, saslConfig) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + c.Assert(cfg.Net.SASL.User, check.Equals, "user") + c.Assert(cfg.Net.SASL.Password, check.Equals, "password") + c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256")) +} + +func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) { + defer testleak.AfterTest(c) + cfg := NewConfig() + + // Normal config. + uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + + "&max-message-bytes=%s&partition-num=1&replication-factor=3" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + maxMessageSize := "4096" // 4kb + uri := fmt.Sprintf(uriTemplate, maxMessageSize) + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) + c.Assert(cfg.Version, check.Equals, "2.6.0") + c.Assert(cfg.MaxMessageBytes, check.Equals, 4096) + expectedOpts := map[string]string{ + "max-message-bytes": maxMessageSize, + "max-batch-size": "5", + } + for k, v := range opts { + c.Assert(v, check.Equals, expectedOpts[k]) + } + + // Illegal replication-factor. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&replication-factor=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal max-message-bytes. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&max-message-bytes=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal enable-tidb-extension. + uri = "kafka://127.0.0.1:9092/abc?enable-tidb-extension=a&protocol=canal-json" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal partition-num. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Out of range partition-num. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") + + // Use enable-tidb-extension on other protocols. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=1&enable-tidb-extension=true" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*enable-tidb-extension only support canal-json protocol.*") + + // Test enable-tidb-extension. + uri = "kafka://127.0.0.1:9092/abc?enable-tidb-extension=true&protocol=canal-json" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + opts = make(map[string]string) + err = cfg.CompleteByOpts(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + expectedOpts = map[string]string{ + "enable-tidb-extension": "true", + } + for k, v := range opts { + c.Assert(v, check.Equals, expectedOpts[k]) + } +} + +func (s *kafkaSuite) TestSetPartitionNum(c *check.C) { + defer testleak.AfterTest(c)() + cfg := NewConfig() + err := cfg.setPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(2)) + + cfg.PartitionNum = 1 + err = cfg.setPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + + cfg.PartitionNum = 3 + err = cfg.setPartitionNum(2) + c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) +} diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 7b5742ba66e..9668ad9c1be 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -16,7 +16,6 @@ package kafka import ( "context" "fmt" - "net/url" "regexp" "strconv" "strings" @@ -29,158 +28,28 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/sink/codec" - "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/notify" - "github.com/pingcap/ticdc/pkg/security" - "github.com/pingcap/ticdc/pkg/util" "go.uber.org/zap" ) -const defaultPartitionNum = 3 - -// Config stores user specified Kafka producer configuration -type Config struct { - BrokerEndpoints []string - PartitionNum int32 - - // User should make sure that `replication-factor` not greater than the number of kafka brokers. - ReplicationFactor int16 - - Version string - MaxMessageBytes int - Compression string - ClientID string - Credential *security.Credential - SaslScram *security.SaslScram - // control whether to create topic - AutoCreate bool -} - -// NewConfig returns a default Kafka configuration -func NewConfig() *Config { - return &Config{ - Version: "2.4.0", - // MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker. - MaxMessageBytes: 1 * 1024 * 1024, - ReplicationFactor: 1, - Compression: "none", - Credential: &security.Credential{}, - SaslScram: &security.SaslScram{}, - AutoCreate: true, - } -} - -// Initialize the kafka configuration -func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { - c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") - params := sinkURI.Query() - s := params.Get("partition-num") - if s != "" { - a, err := strconv.Atoi(s) - if err != nil { - return err - } - c.PartitionNum = int32(a) - if c.PartitionNum <= 0 { - return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(c.PartitionNum) - } - } - - s = params.Get("replication-factor") - if s != "" { - a, err := strconv.Atoi(s) - if err != nil { - return err - } - c.ReplicationFactor = int16(a) - } - - s = params.Get("kafka-version") - if s != "" { - c.Version = s - } - - s = params.Get("max-message-bytes") - if s != "" { - a, err := strconv.Atoi(s) - if err != nil { - return err - } - c.MaxMessageBytes = a - opts["max-message-bytes"] = s - } - - s = params.Get("max-batch-size") - if s != "" { - opts["max-batch-size"] = s - } - - s = params.Get("compression") - if s != "" { - c.Compression = s - } - - c.ClientID = params.Get("kafka-client-id") - - s = params.Get("ca") - if s != "" { - c.Credential.CAPath = s - } - - s = params.Get("cert") - if s != "" { - c.Credential.CertPath = s - } - - s = params.Get("key") - if s != "" { - c.Credential.KeyPath = s - } - - s = params.Get("sasl-user") - if s != "" { - c.SaslScram.SaslUser = s - } - - s = params.Get("sasl-password") - if s != "" { - c.SaslScram.SaslPassword = s - } - - s = params.Get("sasl-mechanism") - if s != "" { - c.SaslScram.SaslMechanism = s - } - - s = params.Get("auto-create-topic") - if s != "" { - autoCreate, err := strconv.ParseBool(s) - if err != nil { - return err - } - c.AutoCreate = autoCreate - } - - s = params.Get("protocol") - if s != "" { - replicaConfig.Sink.Protocol = s - } - - s = params.Get("enable-tidb-extension") - if s != "" { - _, err := strconv.ParseBool(s) - if err != nil { - return err - } - if replicaConfig.Sink.Protocol != "canal-json" { - return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("enable-tidb-extension only support canal-json")) - } - opts["enable-tidb-extension"] = s - } +const ( + // defaultPartitionNum specifies the default number of partitions when we create the topic. + defaultPartitionNum = 3 + // brokerMessageMaxBytesConfigName specifies the largest record batch size allowed by + // Kafka brokers. + // See: https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes + brokerMessageMaxBytesConfigName = "message.max.bytes" + // topicMaxMessageBytesConfigName specifies the largest record batch size allowed by + // Kafka topics. + // See: https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes + topicMaxMessageBytesConfigName = "max.message.bytes" +) - return nil -} +const ( + kafkaProducerRunning = 0 + kafkaProducerClosing = 1 +) type kafkaSaramaProducer struct { // clientLock is used to protect concurrent access of asyncClient and syncClient. @@ -210,11 +79,6 @@ type kafkaSaramaProducer struct { type kafkaProducerClosingFlag = int32 -const ( - kafkaProducerRunning = 0 - kafkaProducerClosing = 1 -) - func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error { k.clientLock.RLock() defer k.clientLock.RUnlock() @@ -444,7 +308,7 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara zap.String("topic", topic), zap.Any("detail", info)) } - if err := config.adjustPartitionNum(info.NumPartitions); err != nil { + if err := config.setPartitionNum(info.NumPartitions); err != nil { return errors.Trace(err) } @@ -553,10 +417,6 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Pr return k, nil } -func init() { - sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB -} - var ( validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) commonInvalidChar = regexp.MustCompile(`[\?:,"]`) @@ -575,100 +435,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) ( return } -// NewSaramaConfig return the default config and set the according version and metrics -func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { - config := sarama.NewConfig() - - version, err := sarama.ParseKafkaVersion(c.Version) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) - } - var role string - if util.IsOwnerFromCtx(ctx) { - role = "owner" - } else { - role = "processor" - } - captureAddr := util.CaptureAddrFromCtx(ctx) - changefeedID := util.ChangefeedIDFromCtx(ctx) - - config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) - if err != nil { - return nil, errors.Trace(err) - } - config.Version = version - // See: https://kafka.apache.org/documentation/#replication - // When one of the brokers in a Kafka cluster is down, the partition leaders - // in this broker is broken, Kafka will election a new partition leader and - // replication logs, this process will last from a few seconds to a few minutes. - // Kafka cluster will not provide a writing service in this process. - // Time out in one minute. - config.Metadata.Retry.Max = 120 - config.Metadata.Retry.Backoff = 500 * time.Millisecond - // If it is not set, this means a metadata request against an unreachable - // cluster (all brokers are unreachable or unresponsive) can take up to - // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + - // Metadata.Retry.Backoff * Metadata.Retry.Max` - // to fail. - // See: https://github.com/Shopify/sarama/issues/765 - // and https://github.com/pingcap/ticdc/issues/3352. - config.Metadata.Timeout = 1 * time.Minute - - config.Producer.Partitioner = sarama.NewManualPartitioner - config.Producer.MaxMessageBytes = c.MaxMessageBytes - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - config.Producer.RequiredAcks = sarama.WaitForAll - // Time out in five minutes(600 * 500ms). - config.Producer.Retry.Max = 600 - config.Producer.Retry.Backoff = 500 * time.Millisecond - switch strings.ToLower(strings.TrimSpace(c.Compression)) { - case "none": - config.Producer.Compression = sarama.CompressionNone - case "gzip": - config.Producer.Compression = sarama.CompressionGZIP - case "snappy": - config.Producer.Compression = sarama.CompressionSnappy - case "lz4": - config.Producer.Compression = sarama.CompressionLZ4 - case "zstd": - config.Producer.Compression = sarama.CompressionZSTD - default: - log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression)) - config.Producer.Compression = sarama.CompressionNone - } - - // Time out in one minute(120 * 500ms). - config.Admin.Retry.Max = 120 - config.Admin.Retry.Backoff = 500 * time.Millisecond - config.Admin.Timeout = 20 * time.Second - - if c.Credential != nil && len(c.Credential.CAPath) != 0 { - config.Net.TLS.Enable = true - config.Net.TLS.Config, err = c.Credential.ToTLSConfig() - if err != nil { - return nil, errors.Trace(err) - } - } - if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 { - config.Net.SASL.Enable = true - config.Net.SASL.User = c.SaslScram.SaslUser - config.Net.SASL.Password = c.SaslScram.SaslPassword - config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism) - if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") { - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} } - } else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") { - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} } - } else { - return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512") - } - } - - return config, err -} - func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) { - target := "message.max.bytes" _, controllerID, err := admin.DescribeCluster() if err != nil { return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -677,13 +444,13 @@ func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) { configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ Type: sarama.BrokerResource, Name: strconv.Itoa(int(controllerID)), - ConfigNames: []string{target}, + ConfigNames: []string{brokerMessageMaxBytesConfigName}, }) if err != nil { return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - if len(configEntries) == 0 || configEntries[0].Name != target { + if len(configEntries) == 0 || configEntries[0].Name != brokerMessageMaxBytesConfigName { return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack( "since cannot find the `message.max.bytes` from the broker's configuration, " + "ticdc decline to create the topic and changefeed to prevent potential error") @@ -698,7 +465,7 @@ func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) { } func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) { - if a, ok := info.ConfigEntries["max.message.bytes"]; ok { + if a, ok := info.ConfigEntries[topicMaxMessageBytesConfigName]; ok { result, err := strconv.Atoi(*a) if err != nil { return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -708,30 +475,3 @@ func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) return getBrokerMessageMaxBytes(admin) } - -// adjust the partition-num by the topic's partition count -func (c *Config) adjustPartitionNum(realPartitionCount int32) error { - // user does not specify the `partition-num` in the sink-uri - if c.PartitionNum == 0 { - c.PartitionNum = realPartitionCount - return nil - } - - if c.PartitionNum < realPartitionCount { - log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ - "Some partitions will not have messages dispatched to", - zap.Int32("sink-uri partitions", c.PartitionNum), - zap.Int32("topic partitions", realPartitionCount)) - return nil - } - - // Make sure that the user-specified `partition-num` is not greater than - // the real partition count, since messages would be dispatched to different - // partitions, this could prevent potential correctness problems. - if c.PartitionNum > realPartitionCount { - return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( - "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", - c.PartitionNum, realPartitionCount) - } - return nil -} diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index 12403bda857..9d472feafdc 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -15,8 +15,6 @@ package kafka import ( "context" - "fmt" - "net/url" "strings" "sync" "testing" @@ -27,10 +25,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/ticdc/cdc/sink/codec" - "github.com/pingcap/ticdc/pkg/config" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/security" - "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -67,45 +61,6 @@ func (s *kafkaSuite) TestClientID(c *check.C) { } } -func (s *kafkaSuite) TestInitializeConfig(c *check.C) { - defer testleak.AfterTest(c) - cfg := NewConfig() - - uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + - "&max-message-bytes=%s&partition-num=1&replication-factor=3" + - "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" - maxMessageSize := "4096" // 4kb - uri := fmt.Sprintf(uriTemplate, maxMessageSize) - - sinkURI, err := url.Parse(uri) - c.Assert(err, check.IsNil) - - replicaConfig := config.GetDefaultReplicaConfig() - - opts := make(map[string]string) - err = cfg.Initialize(sinkURI, replicaConfig, opts) - c.Assert(err, check.IsNil) - - c.Assert(cfg.PartitionNum, check.Equals, int32(1)) - c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) - c.Assert(cfg.Version, check.Equals, "2.6.0") - c.Assert(cfg.MaxMessageBytes, check.Equals, 4096) - - expectedOpts := map[string]string{ - "max-message-bytes": maxMessageSize, - "max-batch-size": "5", - } - for k, v := range opts { - c.Assert(v, check.Equals, expectedOpts[k]) - } - - uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" - sinkURI, err = url.Parse(uri) - c.Assert(err, check.IsNil) - err = cfg.Initialize(sinkURI, replicaConfig, opts) - c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") -} - func (s *kafkaSuite) TestSaramaProducer(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) @@ -234,23 +189,6 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } } -func (s *kafkaSuite) TestAdjustPartitionNum(c *check.C) { - defer testleak.AfterTest(c)() - config := NewConfig() - err := config.adjustPartitionNum(2) - c.Assert(err, check.IsNil) - c.Assert(config.PartitionNum, check.Equals, int32(2)) - - config.PartitionNum = 1 - err = config.adjustPartitionNum(2) - c.Assert(err, check.IsNil) - c.Assert(config.PartitionNum, check.Equals, int32(1)) - - config.PartitionNum = 3 - err = config.adjustPartitionNum(2) - c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) -} - func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { defer testleak.AfterTest(c) topic := "unit_test_2" @@ -283,62 +221,6 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) } -func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { - defer testleak.AfterTest(c)() - ctx := context.Background() - config := NewConfig() - config.Version = "invalid" - _, err := newSaramaConfigImpl(ctx, config) - c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") - - ctx = util.SetOwnerInCtx(ctx) - config.Version = "2.6.0" - config.ClientID = "^invalid$" - _, err = newSaramaConfigImpl(ctx, config) - c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) - - config.ClientID = "test-kafka-client" - compressionCases := []struct { - algorithm string - expected sarama.CompressionCodec - }{ - {"none", sarama.CompressionNone}, - {"gzip", sarama.CompressionGZIP}, - {"snappy", sarama.CompressionSnappy}, - {"lz4", sarama.CompressionLZ4}, - {"zstd", sarama.CompressionZSTD}, - {"others", sarama.CompressionNone}, - } - for _, cc := range compressionCases { - config.Compression = cc.algorithm - cfg, err := newSaramaConfigImpl(ctx, config) - c.Assert(err, check.IsNil) - c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) - } - - config.Credential = &security.Credential{ - CAPath: "/invalid/ca/path", - } - _, err = newSaramaConfigImpl(ctx, config) - c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") - - saslConfig := NewConfig() - saslConfig.Version = "2.6.0" - saslConfig.ClientID = "test-sasl-scram" - saslConfig.SaslScram = &security.SaslScram{ - SaslUser: "user", - SaslPassword: "password", - SaslMechanism: sarama.SASLTypeSCRAMSHA256, - } - - cfg, err := newSaramaConfigImpl(ctx, saslConfig) - c.Assert(err, check.IsNil) - c.Assert(cfg, check.NotNil) - c.Assert(cfg.Net.SASL.User, check.Equals, "user") - c.Assert(cfg.Net.SASL.Password, check.Equals, "password") - c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256")) -} - func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index cb164820e73..8c8dc5117c2 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -77,6 +77,7 @@ var ( ErrKafkaNewSaramaProducer = errors.Normalize("new sarama producer", errors.RFCCodeText("CDC:ErrKafkaNewSaramaProducer")) ErrKafkaInvalidClientID = errors.Normalize("invalid kafka client ID '%s'", errors.RFCCodeText("CDC:ErrKafkaInvalidClientID")) ErrKafkaInvalidVersion = errors.Normalize("invalid kafka version", errors.RFCCodeText("CDC:ErrKafkaInvalidVersion")) + ErrKafkaInvalidConfig = errors.Normalize("kafka config invalid", errors.RFCCodeText("CDC:ErrKafkaInvalidConfig")) ErrPulsarNewProducer = errors.Normalize("new pulsar producer", errors.RFCCodeText("CDC:ErrPulsarNewProducer")) ErrPulsarSendMessage = errors.Normalize("pulsar send message failed", errors.RFCCodeText("CDC:ErrPulsarSendMessage")) ErrRedoConfigInvalid = errors.Normalize("redo log config invalid", errors.RFCCodeText("CDC:ErrRedoConfigInvalid")) @@ -90,7 +91,6 @@ var ( ErrS3StorageInitialize = errors.Normalize("new s3 storage for redo log", errors.RFCCodeText("CDC:ErrS3StorageInitialize")) ErrPrepareAvroFailed = errors.Normalize("prepare avro failed", errors.RFCCodeText("CDC:ErrPrepareAvroFailed")) ErrAsyncBroadcastNotSupport = errors.Normalize("Async broadcasts not supported", errors.RFCCodeText("CDC:ErrAsyncBroadcastNotSupport")) - ErrKafkaInvalidConfig = errors.Normalize("kafka config invalid", errors.RFCCodeText("CDC:ErrKafkaInvalidConfig")) ErrSinkURIInvalid = errors.Normalize("sink uri invalid", errors.RFCCodeText("CDC:ErrSinkURIInvalid")) ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError")) ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"))