Skip to content

Commit

Permalink
cdc/metrics: Integrate sarama producer metrics (#4520) (#4574)
Browse files Browse the repository at this point in the history
close #4561
  • Loading branch information
ti-chi-bot authored Feb 22, 2022
1 parent 98ee360 commit 1400e37
Show file tree
Hide file tree
Showing 6 changed files with 3,966 additions and 1,173 deletions.
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter/memory"
"github.com/pingcap/tiflow/cdc/sorter/unified"
"github.com/pingcap/tiflow/pkg/actor"
Expand Down Expand Up @@ -51,4 +52,5 @@ func init() {
memory.InitMetrics(registry)
unified.InitMetrics(registry)
redowriter.InitMetrics(registry)
kafka.InitMetrics(registry)
}
17 changes: 11 additions & 6 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,7 +91,9 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
return errors.Trace(err)
}

s, err := sink.New(ctx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
stdCtx := util.PutChangefeedIDInCtx(ctx, id)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleOwner)
s, err := sink.New(stdCtx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -99,13 +102,13 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
if !info.SyncPointEnabled {
return nil
}
syncPointStore, err := sink.NewSyncpointStore(ctx, id, info.SinkURI)
syncPointStore, err := sink.NewSyncpointStore(stdCtx, id, info.SinkURI)
if err != nil {
return errors.Trace(err)
}
a.syncPointStore = syncPointStore

if err := a.syncPointStore.CreateSynctable(ctx); err != nil {
if err := a.syncPointStore.CreateSynctable(stdCtx); err != nil {
return errors.Trace(err)
}
return nil
Expand All @@ -121,11 +124,13 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m

start := time.Now()
if err := s.sinkInitHandler(ctx, s, id, info); err != nil {
log.Warn("ddl sink initialize failed", zap.Duration("elapsed", time.Since(start)))
log.Warn("ddl sink initialize failed",
zap.Duration("duration", time.Since(start)))
ctx.Throw(err)
return
}
log.Info("ddl sink initialized, start processing...", zap.Duration("elapsed", time.Since(start)))
log.Info("ddl sink initialized, start processing...",
zap.Duration("duration", time.Since(start)))

// TODO make the tick duration configurable
ticker := time.NewTicker(time.Second)
Expand Down Expand Up @@ -163,7 +168,7 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
}
// If DDL executing failed, and the error can not be ignored, throw an error and pause the changefeed
log.Error("Execute DDL failed",
zap.String("ChangeFeedID", ctx.ChangefeedVars().ID),
zap.String("changefeed", ctx.ChangefeedVars().ID),
zap.Error(err),
zap.Reflect("ddl", ddl))
ctx.Throw(errors.Trace(err))
Expand Down
32 changes: 28 additions & 4 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ import (
"go.uber.org/zap"
)

const defaultPartitionNum = 3
const (
// defaultPartitionNum specifies the default number of partitions when we create the topic.
defaultPartitionNum = 3

// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
)

// Config stores user specified Kafka producer configuration
type Config struct {
Expand Down Expand Up @@ -231,8 +237,11 @@ type kafkaSaramaProducer struct {
closeCh chan struct{}
// atomic flag indicating whether the producer is closing
closing kafkaProducerClosingFlag
role util.Role
id model.ChangeFeedID

role util.Role
id model.ChangeFeedID

metricsMonitor *saramaMetricsMonitor
}

type kafkaProducerClosingFlag = int32
Expand Down Expand Up @@ -415,6 +424,8 @@ func (k *kafkaSaramaProducer) Close() error {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()
return nil
}

Expand All @@ -425,12 +436,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
k.stop()
}()

ticker := time.NewTicker(flushMetricsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-k.closeCh:
return nil
case <-ticker.C:
k.metricsMonitor.CollectMetrics()
case err := <-k.failpointCh:
log.Warn("receive from failpoint chan", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", k.role))
Expand Down Expand Up @@ -603,6 +619,12 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,

id: changefeedID,
role: role,

metricsMonitor: NewSaramaMetricsMonitor(cfg.MetricRegistry,
util.CaptureAddrFromCtx(ctx), changefeedID),
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
Expand All @@ -611,10 +633,12 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
log.Error("error channel is full", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", role))
}
}
}()

return k, nil
}

Expand Down
118 changes: 118 additions & 0 deletions cdc/sink/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
)

var (
// batch-size
batchSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_batch_size",
Help: "the number of bytes sent per partition per request for all topics",
}, []string{"capture", "changefeed"})

// record-send-rate
recordSendRateGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_record_send_rate",
Help: "Records/second sent to all topics",
}, []string{"capture", "changefeed"})

// records-per-request
recordPerRequestGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_records_per_request",
Help: "the number of records sent per request for all topics",
}, []string{"capture", "changefeed"})

// compression-ratio
compressionRatioGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_compression_ratio",
Help: "the compression ratio times 100 of record batches for all topics",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(batchSizeGauge)
registry.MustRegister(recordSendRateGauge)
registry.MustRegister(recordPerRequestGauge)
registry.MustRegister(compressionRatioGauge)
}

// sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview
const (
batchSizeMetricName = "batch-size"
recordSendRateMetricName = "record-send-rate"
recordPerRequestMetricName = "records-per-request"
compressionRatioMetricName = "compression-ratio"
)

type saramaMetricsMonitor struct {
captureAddr string
changefeedID string

registry metrics.Registry
}

// CollectMetrics collect all monitored metrics
func (sm *saramaMetricsMonitor) CollectMetrics() {
batchSizeMetric := sm.registry.Get(batchSizeMetricName)
if histogram, ok := batchSizeMetric.(metrics.Histogram); ok {
batchSizeGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}

recordSendRateMetric := sm.registry.Get(recordSendRateMetricName)
if meter, ok := recordSendRateMetric.(metrics.Meter); ok {
recordSendRateGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(meter.Rate1())
}

recordPerRequestMetric := sm.registry.Get(recordPerRequestMetricName)
if histogram, ok := recordPerRequestMetric.(metrics.Histogram); ok {
recordPerRequestGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}

compressionRatioMetric := sm.registry.Get(compressionRatioMetricName)
if histogram, ok := compressionRatioMetric.(metrics.Histogram); ok {
compressionRatioGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}
}

func NewSaramaMetricsMonitor(registry metrics.Registry, captureAddr, changefeedID string) *saramaMetricsMonitor {
return &saramaMetricsMonitor{
captureAddr: captureAddr,
changefeedID: changefeedID,
registry: registry,
}
}

func (sm *saramaMetricsMonitor) Cleanup() {
batchSizeGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
recordSendRateGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
recordPerRequestGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
compressionRatioGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/shopspring/decimal v1.3.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.2.1
Expand Down
Loading

0 comments on commit 1400e37

Please sign in to comment.