Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #394] Fix disorder of messages to Kafka #397

Merged
merged 38 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
33e77ca
collect pprof heap
pingyu Feb 11, 2024
7ceeefb
unlimit retry for pd connection
pingyu Feb 13, 2024
88bebcc
reduce record size
pingyu Feb 13, 2024
9430c95
log level: info
pingyu Feb 13, 2024
25de855
reduce data size; add grafana panel
pingyu Feb 13, 2024
20b8fad
batch
pingyu Feb 25, 2024
5eafecd
fix
pingyu Feb 25, 2024
1ec6a2b
try debug
pingyu Feb 25, 2024
7e890be
fix encoder size
pingyu Feb 25, 2024
1db9282
fix
pingyu Feb 25, 2024
bf62381
Merge branch 'fix-flow-control' into kafka-consumer-batch
pingyu Feb 25, 2024
f0d1382
MQMessage pool
pingyu Feb 26, 2024
e3addb4
Merge branch 'fix-flow-control' into kafka-consumer-batch
pingyu Feb 26, 2024
b62252f
fix release
pingyu Mar 2, 2024
671adf5
wip
pingyu Mar 2, 2024
d5af5a3
fix flaky ut
pingyu Mar 2, 2024
87e7534
logging
pingyu Mar 2, 2024
a6f8b1d
fix ut
pingyu Mar 2, 2024
3d406b3
wip
pingyu Mar 2, 2024
84578d7
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 2, 2024
d0c9e75
adjust memory release parameter
pingyu Mar 3, 2024
6b64d8b
polish
pingyu Mar 3, 2024
200c939
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 3, 2024
b54817c
polish
pingyu Mar 3, 2024
f568dd3
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 3, 2024
20b67dd
polish
pingyu Mar 3, 2024
772feed
polish
pingyu Mar 3, 2024
e092598
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
pingyu Mar 3, 2024
acf0d83
fix ut
pingyu Mar 3, 2024
1a15bc5
idempotent = true
pingyu Mar 3, 2024
7450b16
fix ut
pingyu Mar 3, 2024
d9a60bb
fix ut
pingyu Mar 3, 2024
a48608e
revert time count
pingyu Mar 4, 2024
02b5646
Merge remote-tracking branch 'upstream/main' into kafka-consumer-batch
pingyu Mar 4, 2024
2686dbe
Merge remote-tracking branch 'origin/kafka-consumer-batch' into fix-o…
pingyu Mar 4, 2024
dd9620a
polish
pingyu Mar 4, 2024
d7b9024
Merge remote-tracking branch 'origin/kafka-consumer-batch' into fix-o…
pingyu Mar 4, 2024
04a335c
Merge branch 'main' into fix-ordering
pingyu Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cdc/cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
opts := map[string]string{}
errCh := make(chan error, 1)

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
// Idempotent requires Kafka version >= 0.11.0.0
config.Idempotent = false
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
return cfg, err
}
defer func() {
kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak
}()
kafkap.NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand Down Expand Up @@ -146,6 +157,8 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
// Idempotent requires Kafka version >= 0.11.0.0
config.Idempotent = false
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
Expand Down
11 changes: 11 additions & 0 deletions cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Config struct {
SaslScram *security.SaslScram
// control whether to create topic
AutoCreate bool
// Whether to enable idempotent producer
Idempotent bool
}

// NewConfig returns a default Kafka configuration
Expand All @@ -63,6 +65,7 @@ func NewConfig() *Config {
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
AutoCreate: true,
Idempotent: true,
}
}

Expand Down Expand Up @@ -231,6 +234,14 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// and https://github.com/tikv/migration/cdc/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

// See: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#enable-idempotence
config.Producer.Idempotent = c.Idempotent
if c.Idempotent {
config.Net.MaxOpenRequests = 1
} else {
log.Warn("The idempotent producer is disabled, which may cause data reordering")
}

config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
Expand Down
3 changes: 3 additions & 0 deletions cdc/cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.AutoCreate = false
config.Idempotent = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

newSaramaConfigImplBak := NewSaramaConfigImpl
Expand Down Expand Up @@ -339,6 +340,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.AutoCreate = false
config.Idempotent = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

NewAdminClientImpl = kafka.NewMockAdminClient
Expand Down Expand Up @@ -421,6 +423,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.AutoCreate = false
config.Idempotent = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

NewAdminClientImpl = kafka.NewMockAdminClient
Expand Down
Loading