Skip to content

Commit

Permalink
sink(ticdc): add tests for enable-tidb-extension and extract the conf…
Browse files Browse the repository at this point in the history
…ig out (pingcap#3693)
  • Loading branch information
Rustin170506 authored Dec 2, 2021
1 parent 0a4d985 commit ee4a7f8
Show file tree
Hide file tree
Showing 6 changed files with 512 additions and 400 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
config := kafka.NewConfig()
if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil {
if err := config.CompleteByOpts(sinkURI, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

Expand Down
297 changes: 297 additions & 0 deletions cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"context"
"net/url"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"go.uber.org/zap"
)

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
}

// Config stores user specified Kafka producer configuration
type Config struct {
BrokerEndpoints []string
PartitionNum int32

// User should make sure that `replication-factor` not greater than the number of kafka brokers.
ReplicationFactor int16

Version string
MaxMessageBytes int
Compression string
ClientID string
Credential *security.Credential
SaslScram *security.SaslScram
// control whether to create topic
AutoCreate bool
}

// NewConfig returns a default Kafka configuration
func NewConfig() *Config {
return &Config{
Version: "2.4.0",
// MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker.
MaxMessageBytes: 1 * 1024 * 1024,
ReplicationFactor: 1,
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
AutoCreate: true,
}
}

// CompleteByOpts the kafka producer configuration.
func (c *Config) CompleteByOpts(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
c.BrokerEndpoints = strings.Split(sinkURI.Host, ",")
params := sinkURI.Query()
s := params.Get("partition-num")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.PartitionNum = int32(a)
if c.PartitionNum <= 0 {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(c.PartitionNum)
}
}

s = params.Get("replication-factor")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.ReplicationFactor = int16(a)
}

s = params.Get("kafka-version")
if s != "" {
c.Version = s
}

s = params.Get("max-message-bytes")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.MaxMessageBytes = a
opts["max-message-bytes"] = s
}

s = params.Get("max-batch-size")
if s != "" {
opts["max-batch-size"] = s
}

s = params.Get("compression")
if s != "" {
c.Compression = s
}

c.ClientID = params.Get("kafka-client-id")

s = params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = params.Get("sasl-user")
if s != "" {
c.SaslScram.SaslUser = s
}

s = params.Get("sasl-password")
if s != "" {
c.SaslScram.SaslPassword = s
}

s = params.Get("sasl-mechanism")
if s != "" {
c.SaslScram.SaslMechanism = s
}

s = params.Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
if err != nil {
return err
}
c.AutoCreate = autoCreate
}

s = params.Get("protocol")
if s != "" {
replicaConfig.Sink.Protocol = s
}

s = params.Get("enable-tidb-extension")
if s != "" {
_, err := strconv.ParseBool(s)
if err != nil {
return err
}
if replicaConfig.Sink.Protocol != "canal-json" {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("enable-tidb-extension only support canal-json protocol"))
}
opts["enable-tidb-extension"] = s
}

return nil
}

// set the partition-num by the topic's partition count.
func (c *Config) setPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
return nil
}

if c.PartitionNum < realPartitionCount {
log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+
"Some partitions will not have messages dispatched to",
zap.Int32("sink-uri partitions", c.PartitionNum),
zap.Int32("topic partitions", realPartitionCount))
return nil
}

// Make sure that the user-specified `partition-num` is not greater than
// the real partition count, since messages would be dispatched to different
// partitions, this could prevent potential correctness problems.
if c.PartitionNum > realPartitionCount {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)",
c.PartitionNum, realPartitionCount)
}
return nil
}

// newSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()

version, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err)
}
var role string
if util.IsOwnerFromCtx(ctx) {
role = "owner"
} else {
role = "processor"
}
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)

config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID)
if err != nil {
return nil, errors.Trace(err)
}
config.Version = version
// See: https://kafka.apache.org/documentation/#replication
// When one of the brokers in a Kafka cluster is down, the partition leaders
// in this broker is broken, Kafka will election a new partition leader and
// replication logs, this process will last from a few seconds to a few minutes.
// Kafka cluster will not provide a writing service in this process.
// Time out in one minute.
config.Metadata.Retry.Max = 120
config.Metadata.Retry.Backoff = 500 * time.Millisecond
// If it is not set, this means a metadata request against an unreachable
// cluster (all brokers are unreachable or unresponsive) can take up to
// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) +
// Metadata.Retry.Backoff * Metadata.Retry.Max`
// to fail.
// See: https://github.com/Shopify/sarama/issues/765
// and https://github.com/pingcap/ticdc/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll
// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond
switch strings.ToLower(strings.TrimSpace(c.Compression)) {
case "none":
config.Producer.Compression = sarama.CompressionNone
case "gzip":
config.Producer.Compression = sarama.CompressionGZIP
case "snappy":
config.Producer.Compression = sarama.CompressionSnappy
case "lz4":
config.Producer.Compression = sarama.CompressionLZ4
case "zstd":
config.Producer.Compression = sarama.CompressionZSTD
default:
log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression))
config.Producer.Compression = sarama.CompressionNone
}

// Time out in one minute(120 * 500ms).
config.Admin.Retry.Max = 120
config.Admin.Retry.Backoff = 500 * time.Millisecond
config.Admin.Timeout = 1 * time.Minute

if c.Credential != nil && len(c.Credential.CAPath) != 0 {
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
}
if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 {
config.Net.SASL.Enable = true
config.Net.SASL.User = c.SaslScram.SaslUser
config.Net.SASL.Password = c.SaslScram.SaslPassword
config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism)
if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} }
} else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} }
} else {
return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512")
}
}

return config, err
}
Loading

0 comments on commit ee4a7f8

Please sign in to comment.