Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql (ticdc): Improve the performance of the mysql sink by refining the transaction event batching logic #10466

Merged
merged 14 commits into from
Jan 16, 2024
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newSink(ctx context.Context,
g, ctx1 := errgroup.WithContext(ctx)
for i, backend := range backends {
w := newWorker(ctx1, changefeedID, i, backend, len(backends))
g.Go(func() error { return w.runLoop() })
g.Go(func() error { return w.run() })
sink.workers = append(sink.workers, w)
}

Expand Down
53 changes: 28 additions & 25 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type worker struct {
metricConflictDetectDuration prometheus.Observer
metricQueueDuration prometheus.Observer
metricTxnWorkerFlushDuration prometheus.Observer
metricTxnWorkerBusyRatio prometheus.Counter
metricTxnWorkerTotalDuration prometheus.Observer
metricTxnWorkerHandledRows prometheus.Counter

// Fields only used in the background loop.
Expand All @@ -69,8 +69,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricQueueDuration: txn.QueueDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerTotalDuration: txn.WorkerTotalDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
Expand All @@ -93,7 +93,7 @@ func (w *worker) close() {
}

// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) runLoop() error {
func (w *worker) run() error {
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -106,14 +106,9 @@ func (w *worker) runLoop() error {
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))

ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()

needFlush := false
var flushTimeSlice, totalTimeSlice time.Duration
overseerTicker := time.NewTicker(time.Second)
defer overseerTicker.Stop()
startToWork := time.Now()
startToBatching := time.Now()

for {
select {
case <-w.ctx.Done():
Expand All @@ -122,27 +117,38 @@ func (w *worker) runLoop() error {
zap.Int("workerID", w.ID))
return nil
case txn := <-w.txnCh.Out():
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
hicqu marked this conversation as resolved.
Show resolved Hide resolved
// If no more data in txnCh.out, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.txnEvent != nil {
needFlush = w.onEvent(txn)
if !needFlush {
Copy link
Contributor

@CharlesCheung96 CharlesCheung96 Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the core idea here is to ensure that the flush interval is greater than 10ms. Maybe we could record lastFlushTime at the end of each flush and check it in each ticker? such as:

case <-ticker.C:
    if time.Since(lastFlushTime) >= w.flushInterval {
    	needFlush = true
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe both of these two methods are feasible, and I'm wondering if the current code might be a bit more straightforward and easier to understand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the nested logic is more complex.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok here orz

delay := time.After(w.flushInterval)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
for !needFlush {
select {
case txn := <-w.txnCh.Out():
needFlush = w.onEvent(txn)
case <-delay:
hicqu marked this conversation as resolved.
Show resolved Hide resolved
needFlush = true
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it ends the if block starts at line 125, I think we can flush here directly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that we can move L139-L151 to L135 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

}
}
case <-ticker.C:
needFlush = true
case now := <-overseerTicker.C:
totalTimeSlice = now.Sub(startToWork)
busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000)
w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount))
startToWork = now
flushTimeSlice = 0
}
if needFlush {
if err := w.doFlush(&flushTimeSlice); err != nil {
if err := w.doFlush(); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
}
needFlush = false
// we record total time to calcuate the worker busy ratio.
// so we record the total time after flushing, to unified statistics on
// flush time and total time
w.metricTxnWorkerTotalDuration.Observe(time.Since(startToBatching).Seconds())
startToBatching = time.Now()
}
}
}
Expand All @@ -169,15 +175,12 @@ func (w *worker) onEvent(txn txnWithNotifier) bool {
}

// doFlush flushes the backend.
func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
func (w *worker) doFlush() error {
if w.hasPending {
start := time.Now()
defer func() {
elapsed := time.Since(start)
*flushTimeSlice += elapsed
w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds())
w.metricTxnWorkerFlushDuration.Observe(time.Since(start).Seconds())
}()

if err := w.backend.Flush(w.ctx); err != nil {
log.Warn("Transaction dmlSink backend flush fail",
zap.String("changefeedID", w.changefeed),
Expand Down
15 changes: 8 additions & 7 deletions cdc/sink/metrics/txn/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ var (
Name: "txn_worker_flush_duration",
Help: "Flush duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed"})
}, []string{"namespace", "changefeed", "id"})

WorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
WorkerTotalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed"})
Name: "txn_worker_total_duration",
Help: "total duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed", "id"})

WorkerHandledRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -94,7 +95,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ConflictDetectDuration)
registry.MustRegister(QueueDuration)
registry.MustRegister(WorkerFlushDuration)
registry.MustRegister(WorkerBusyRatio)
registry.MustRegister(WorkerTotalDuration)
registry.MustRegister(WorkerHandledRows)
registry.MustRegister(SinkDMLBatchCommit)
registry.MustRegister(SinkDMLBatchCallback)
Expand Down
4 changes: 2 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -6482,9 +6482,9 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_sink_txn_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (namespace,changefeed,instance)",
"expr": "sum(rate(ticdc_sink_txn_worker_flush_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) /sum(rate(ticdc_sink_txn_worker_total_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) *100",
"interval": "",
"legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}",
"legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-worker-{{id}}",
"queryType": "randomWalk",
"refId": "A"
}
Expand Down
Loading