From d9abc81893758db8da677171cb90e3dc18e67a6d Mon Sep 17 00:00:00 2001 From: Ling Jin Date: Fri, 29 Oct 2021 22:25:54 +0800 Subject: [PATCH] fix testcase. --- cdc/sink/producer/kafka/kafka_test.go | 68 ++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index b4f72df36bd..5077c6008db 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net/url" + "strings" "sync" "testing" "time" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/ticdc/cdc/sink/codec" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/kafka" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/pkg/util/testleak" @@ -129,6 +131,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { config.Version = "0.9.0.0" config.PartitionNum = int32(2) config.TopicPreProcess = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -141,7 +144,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { newSaramaConfigImpl = newSaramaConfigImplBak }() - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) for i := 0; i < 100; i++ { @@ -243,18 +246,46 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { config := NewConfig() config.PartitionNum = int32(0) + config.BrokerEndpoints = strings.Split(broker.Addr(), ",") cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) - num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) + + admin, err := kafka.NewAdmin(config.BrokerEndpoints, cfg) + c.Assert(err, check.IsNil) + defer admin.Close() + + err = config.AdjustPartitionNum(topic, admin) c.Assert(err, check.IsNil) - c.Assert(num, check.Equals, int32(2)) + c.Assert(config.PartitionNum, check.Equals, int32(2)) + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }) + c.Assert(err, check.IsNil) + + config.BrokerEndpoints = []string{""} cfg.Metadata.Retry.Max = 1 - _, err = kafkaTopicPreProcess(topic, "", config, cfg) + admin, err = kafka.NewAdmin(config.BrokerEndpoints, cfg) + c.Assert(err, check.IsNil) + defer admin.Close() + + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }) c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) + config.BrokerEndpoints = strings.Split(broker.Addr(), ",") config.PartitionNum = int32(4) - _, err = kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) + admin, err = kafka.NewAdmin(config.BrokerEndpoints, cfg) + c.Assert(err, check.IsNil) + defer admin.Close() + + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }) c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) } @@ -276,11 +307,23 @@ func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { config := NewConfig() config.PartitionNum = int32(0) + config.BrokerEndpoints = strings.Split(broker.Addr(), ",") cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) - num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) + + admin, err := kafka.NewAdmin(config.BrokerEndpoints, cfg) + c.Assert(err, check.IsNil) + defer admin.Close() + + err = config.AdjustPartitionNum(topic, admin) + c.Assert(err, check.IsNil) + c.Assert(config.PartitionNum, check.Equals, int32(4)) + + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }) c.Assert(err, check.IsNil) - c.Assert(num, check.Equals, int32(4)) } func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { @@ -345,12 +388,13 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { errCh := make(chan error, 1) config := NewConfig() config.Version = "invalid" - _, err := NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) + config.BrokerEndpoints = []string{"127.0.0.1:1111"} + _, err := NewKafkaSaramaProducer(ctx, "topic", config, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") config.Version = "0.8.2.0" config.PartitionNum = int32(-1) - _, err = NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) + _, err = NewKafkaSaramaProducer(ctx, "topic", config, errCh) c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) } @@ -376,6 +420,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { config.Version = "0.9.0.0" config.PartitionNum = int32(2) config.TopicPreProcess = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -391,7 +436,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) defer func() { err := producer.Close() c.Assert(err, check.IsNil) @@ -450,9 +495,10 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { config.Version = "0.9.0.0" config.PartitionNum = int32(2) config.TopicPreProcess = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) defer func() { err := producer.Close() c.Assert(err, check.IsNil)