Skip to content

Commit

Permalink
mysql (ticdc): Improve the performance of the mysql sink by refining …
Browse files Browse the repository at this point in the history
…the transaction event batching logic (pingcap#10466)

ref pingcap#10457
  • Loading branch information
hongyunyan authored and CharlesCheung96 committed Apr 26, 2024
1 parent 1b84fc6 commit bfe9c9e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func newSink(ctx context.Context, backends []backend,
g, ctx1 := errgroup.WithContext(ctx)
for i, backend := range backends {
w := newWorker(ctx1, i, backend, len(backends))
g.Go(func() error { return w.runLoop() })
g.Go(func() error { return w.run() })
sink.workers = append(sink.workers, w)
}

Expand Down
81 changes: 43 additions & 38 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type worker struct {
metricConflictDetectDuration prometheus.Observer
metricQueueDuration prometheus.Observer
metricTxnWorkerFlushDuration prometheus.Observer
metricTxnWorkerBusyRatio prometheus.Counter
metricTxnWorkerTotalDuration prometheus.Observer
metricTxnWorkerHandledRows prometheus.Counter

// Fields only used in the background loop.
Expand All @@ -68,8 +68,8 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricQueueDuration: txn.QueueDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerTotalDuration: txn.WorkerTotalDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
Expand All @@ -91,8 +91,8 @@ func (w *worker) close() {
w.txnCh.CloseAndDrain()
}

// Run a loop.
func (w *worker) runLoop() error {
// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) run() error {
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -105,14 +105,7 @@ func (w *worker) runLoop() error {
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))

ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()

needFlush := false
var flushTimeSlice, totalTimeSlice time.Duration
overseerTicker := time.NewTicker(time.Second)
defer overseerTicker.Stop()
startToWork := time.Now()
start := time.Now()
for {
select {
case <-w.ctx.Done():
Expand All @@ -121,27 +114,43 @@ func (w *worker) runLoop() error {
zap.Int("workerID", w.ID))
return nil
case txn := <-w.txnCh.Out():
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.txnEvent != nil {
needFlush = w.onEvent(txn)
}
case <-ticker.C:
needFlush = true
case now := <-overseerTicker.C:
totalTimeSlice = now.Sub(startToWork)
busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000)
w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount))
startToWork = now
flushTimeSlice = 0
}
if needFlush {
if err := w.doFlush(&flushTimeSlice); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
needFlush := w.onEvent(txn)
if !needFlush {
delay := time.NewTimer(w.flushInterval)
for !needFlush {
select {
case txn := <-w.txnCh.Out():
needFlush = w.onEvent(txn)
case <-delay.C:
needFlush = true
}
}
// Release resources promptly
if !delay.Stop() {
select {
case <-delay.C:
default:
}
}
}
// needFlush must be true here, so we can do flush.
if err := w.doFlush(); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
}
// we record total time to calcuate the worker busy ratio.
// so we record the total time after flushing, to unified statistics on
// flush time and total time
w.metricTxnWorkerTotalDuration.Observe(time.Since(start).Seconds())
start = time.Now()
}
needFlush = false
}
}
}
Expand All @@ -168,16 +177,12 @@ func (w *worker) onEvent(txn txnWithNotifier) bool {
}

// doFlush flushes the backend.
// It returns true only if it can no longer be flushed.
func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
func (w *worker) doFlush() error {
if w.hasPending {
start := time.Now()
defer func() {
elapsed := time.Since(start)
*flushTimeSlice += elapsed
w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds())
w.metricTxnWorkerFlushDuration.Observe(time.Since(start).Seconds())
}()

if err := w.backend.Flush(w.ctx); err != nil {
log.Warn("Transaction dmlSink backend flush fail",
zap.String("changefeedID", w.changefeed),
Expand Down
15 changes: 8 additions & 7 deletions cdc/sink/metrics/txn/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ var (
Name: "txn_worker_flush_duration",
Help: "Flush duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed"})
}, []string{"namespace", "changefeed", "id"})

WorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
WorkerTotalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed"})
Name: "txn_worker_total_duration",
Help: "total duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed", "id"})

WorkerHandledRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -94,7 +95,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ConflictDetectDuration)
registry.MustRegister(QueueDuration)
registry.MustRegister(WorkerFlushDuration)
registry.MustRegister(WorkerBusyRatio)
registry.MustRegister(WorkerTotalDuration)
registry.MustRegister(WorkerHandledRows)
registry.MustRegister(SinkDMLBatchCommit)
registry.MustRegister(SinkDMLBatchCallback)
Expand Down

0 comments on commit bfe9c9e

Please sign in to comment.