Skip to content

Commit

Permalink
minor update on sarama config (#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC committed Feb 26, 2019
1 parent 50f5b19 commit 9ee817f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 44 deletions.
3 changes: 2 additions & 1 deletion drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func GenCheckPointCfg(cfg *Config, id uint64) *checkpoint.Config {

func initializeSaramaGlobalConfig() {
sarama.MaxResponseSize = int32(maxMsgSize)
sarama.MaxRequestSize = int32(maxMsgSize)
// add 1 to avoid confused log: Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored
sarama.MaxRequestSize = int32(maxMsgSize) + 1
}

func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) {
Expand Down
47 changes: 4 additions & 43 deletions pkg/util/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,6 @@ func initMetrics() {
exp.Exp(metricRegistry)
}

// CreateKafkaProducer create a sync producer
func CreateKafkaProducer(config *sarama.Config, addr []string, kafkaVersion string, maxMsgSize int, metricsPrefix string) (sarama.SyncProducer, error) {
var (
client sarama.SyncProducer
err error
)

// initial kafka client to use manual partitioner
if config == nil {
config = sarama.NewConfig()
}
config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = maxMsgSize
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
version, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return nil, errors.Trace(err)
}
config.Version = version
config.MetricRegistry = metrics.NewPrefixedChildRegistry(GetParentMetricsRegistry(), metricsPrefix)

log.Infof("kafka producer version %v", version)
for i := 0; i < maxRetry; i++ {
client, err = sarama.NewSyncProducer(addr, config)
if err != nil {
log.Errorf("create kafka client error %v", err)
time.Sleep(retryInterval)
continue
}
return client, nil
}

return nil, errors.Trace(err)
}

// GetParentMetricsRegistry get the metrics registry and expose the metrics while /debug/metrics
func GetParentMetricsRegistry() metrics.Registry {
metricRegistryOnce.Do(initMetrics)
Expand All @@ -77,25 +41,22 @@ func NewSaramaConfig(kafkaVersion string, metricsPrefix string) (*sarama.Config,
return nil, errors.Trace(err)
}

config.ClientID = "tidb_binlog"
config.Version = version
log.Debugf("kafka consumer version %v", version)
config.MetricRegistry = metrics.NewPrefixedChildRegistry(GetParentMetricsRegistry(), metricsPrefix)

return config, nil
}

// CreateKafkaConsumer creates a kafka consumer
func CreateKafkaConsumer(kafkaAddrs []string, kafkaVersion string) (sarama.Consumer, error) {
kafkaCfg := sarama.NewConfig()
kafkaCfg.Consumer.Return.Errors = true
version, err := sarama.ParseKafkaVersion(kafkaVersion)
kafkaCfg, err := NewSaramaConfig(kafkaVersion, "drainer.")
if err != nil {
return nil, errors.Trace(err)
}
kafkaCfg.Version = version
log.Infof("kafka consumer version %v", version)

registry := GetParentMetricsRegistry()
kafkaCfg.MetricRegistry = metrics.NewPrefixedChildRegistry(registry, "drainer.")
kafkaCfg.Consumer.Return.Errors = true

return sarama.NewConsumer(kafkaAddrs, kafkaCfg)
}

0 comments on commit 9ee817f

Please sign in to comment.