Skip to content

Commit

Permalink
fix testcase.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 29, 2021
1 parent dc3e683 commit d9abc81
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"net/url"
"strings"
"sync"
"testing"
"time"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink/codec"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/kafka"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.TopicPreProcess = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
Expand All @@ -141,7 +144,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
newSaramaConfigImpl = newSaramaConfigImplBak
}()

producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
c.Assert(err, check.IsNil)
c.Assert(producer.GetPartitionNum(), check.Equals, int32(2))
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -243,18 +246,46 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {

config := NewConfig()
config.PartitionNum = int32(0)
config.BrokerEndpoints = strings.Split(broker.Addr(), ",")
cfg, err := newSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg)

admin, err := kafka.NewAdmin(config.BrokerEndpoints, cfg)
c.Assert(err, check.IsNil)
defer admin.Close()

err = config.AdjustPartitionNum(topic, admin)
c.Assert(err, check.IsNil)
c.Assert(num, check.Equals, int32(2))
c.Assert(config.PartitionNum, check.Equals, int32(2))

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
})
c.Assert(err, check.IsNil)

config.BrokerEndpoints = []string{""}
cfg.Metadata.Retry.Max = 1
_, err = kafkaTopicPreProcess(topic, "", config, cfg)
admin, err = kafka.NewAdmin(config.BrokerEndpoints, cfg)
c.Assert(err, check.IsNil)
defer admin.Close()

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
})
c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers)

config.BrokerEndpoints = strings.Split(broker.Addr(), ",")
config.PartitionNum = int32(4)
_, err = kafkaTopicPreProcess(topic, broker.Addr(), config, cfg)
admin, err = kafka.NewAdmin(config.BrokerEndpoints, cfg)
c.Assert(err, check.IsNil)
defer admin.Close()

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
})
c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue)
}

Expand All @@ -276,11 +307,23 @@ func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) {

config := NewConfig()
config.PartitionNum = int32(0)
config.BrokerEndpoints = strings.Split(broker.Addr(), ",")
cfg, err := newSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg)

admin, err := kafka.NewAdmin(config.BrokerEndpoints, cfg)
c.Assert(err, check.IsNil)
defer admin.Close()

err = config.AdjustPartitionNum(topic, admin)
c.Assert(err, check.IsNil)
c.Assert(config.PartitionNum, check.Equals, int32(4))

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
})
c.Assert(err, check.IsNil)
c.Assert(num, check.Equals, int32(4))
}

func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
Expand Down Expand Up @@ -345,12 +388,13 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
errCh := make(chan error, 1)
config := NewConfig()
config.Version = "invalid"
_, err := NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh)
config.BrokerEndpoints = []string{"127.0.0.1:1111"}
_, err := NewKafkaSaramaProducer(ctx, "topic", config, errCh)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

config.Version = "0.8.2.0"
config.PartitionNum = int32(-1)
_, err = NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh)
_, err = NewKafkaSaramaProducer(ctx, "topic", config, errCh)
c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue)
}

Expand All @@ -376,6 +420,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.TopicPreProcess = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
Expand All @@ -391,7 +436,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
}()

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -450,9 +495,10 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.TopicPreProcess = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
Expand Down

0 comments on commit d9abc81

Please sign in to comment.