From 9db085f4a5711e5123e41368cccac63731a191d0 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 11:49:01 +0800 Subject: [PATCH 01/12] fix batched flush --- cdc/sink/dmlsink/txn/worker.go | 39 +++++++++++++++++++-------------- cdc/sink/metrics/txn/metrics.go | 15 +++++++------ metrics/grafana/ticdc.json | 4 ++-- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 6fae294ddf3..b96e0c829bd 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -45,7 +45,7 @@ type worker struct { metricConflictDetectDuration prometheus.Observer metricQueueDuration prometheus.Observer metricTxnWorkerFlushDuration prometheus.Observer - metricTxnWorkerBusyRatio prometheus.Counter + metricTxnWorkerTotalDuration prometheus.Observer metricTxnWorkerHandledRows prometheus.Counter // Fields only used in the background loop. @@ -69,8 +69,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricQueueDuration: txn.QueueDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID), + metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), + metricTxnWorkerTotalDuration: txn.WorkerTotalDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), flushInterval: backend.MaxFlushInterval(), @@ -110,10 +110,8 @@ func (w *worker) runLoop() error { defer ticker.Stop() needFlush := false - var flushTimeSlice, totalTimeSlice time.Duration - overseerTicker := time.NewTicker(time.Second) - defer overseerTicker.Stop() startToWork := time.Now() + for { select { case <-w.ctx.Done(): @@ -122,20 +120,26 @@ func (w *worker) runLoop() error { zap.Int("workerID", w.ID)) return nil case txn := <-w.txnCh.Out(): + // we get the data from txnCh.out until no more data here or reached the state that can be flushed. if txn.txnEvent != nil { needFlush = w.onEvent(txn) + if !needFlush { + for txn = range w.txnCh.Out() { + if txn.txnEvent == nil { + break + } + needFlush = w.onEvent(txn) + if needFlush { + break + } + } + } } case <-ticker.C: needFlush = true - case now := <-overseerTicker.C: - totalTimeSlice = now.Sub(startToWork) - busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000) - w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount)) - startToWork = now - flushTimeSlice = 0 } if needFlush { - if err := w.doFlush(&flushTimeSlice); err != nil { + if err := w.doFlush(); err != nil { log.Error("Transaction dmlSink worker exits unexpectly", zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID), @@ -143,6 +147,11 @@ func (w *worker) runLoop() error { return err } needFlush = false + // we record total time to calcuate the worker busy ratio. + // so we record the total time after flushing, to unified statistics on + // flush time and total time + w.metricTxnWorkerTotalDuration.Observe(time.Now().Sub(startToWork).Seconds()) + startToWork = time.Now() } } } @@ -170,15 +179,13 @@ func (w *worker) onEvent(txn txnWithNotifier) bool { // doFlush flushes the backend. // It returns true only if it can no longer be flushed. -func (w *worker) doFlush(flushTimeSlice *time.Duration) error { +func (w *worker) doFlush() error { if w.hasPending { start := time.Now() defer func() { elapsed := time.Since(start) - *flushTimeSlice += elapsed w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds()) }() - if err := w.backend.Flush(w.ctx); err != nil { log.Warn("Transaction dmlSink backend flush fail", zap.String("changefeedID", w.changefeed), diff --git a/cdc/sink/metrics/txn/metrics.go b/cdc/sink/metrics/txn/metrics.go index 62eef252ddd..42822b44cf7 100644 --- a/cdc/sink/metrics/txn/metrics.go +++ b/cdc/sink/metrics/txn/metrics.go @@ -44,15 +44,16 @@ var ( Name: "txn_worker_flush_duration", Help: "Flush duration (s) for txn worker.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s - }, []string{"namespace", "changefeed"}) + }, []string{"namespace", "changefeed", "id"}) - WorkerBusyRatio = prometheus.NewCounterVec( - prometheus.CounterOpts{ + WorkerTotalDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "sink", - Name: "txn_worker_busy_ratio", - Help: "Busy ratio (X ms in 1s) for all workers.", - }, []string{"namespace", "changefeed"}) + Name: "txn_worker_total_duration", + Help: "total duration (s) for txn worker.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 1ms~524s + }, []string{"namespace", "changefeed", "id"}) WorkerHandledRows = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -94,7 +95,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(ConflictDetectDuration) registry.MustRegister(QueueDuration) registry.MustRegister(WorkerFlushDuration) - registry.MustRegister(WorkerBusyRatio) + registry.MustRegister(WorkerTotalDuration) registry.MustRegister(WorkerHandledRows) registry.MustRegister(SinkDMLBatchCommit) registry.MustRegister(SinkDMLBatchCallback) diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 44a31432071..b26992dae3e 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -6482,9 +6482,9 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(ticdc_sink_txn_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (namespace,changefeed,instance)", + "expr": "sum(rate(ticdc_sink_txn_worker_flush_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) /sum(rate(ticdc_sink_txn_worker_total_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) *100", "interval": "", - "legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}", + "legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-worker-{{id}}", "queryType": "randomWalk", "refId": "A" } From fcdf0603e32bb9feab290988ea5d773913729c10 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 15:31:54 +0800 Subject: [PATCH 02/12] update --- cdc/sink/dmlsink/txn/worker.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index ff8bb4bf84f..930e717c869 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -123,15 +123,12 @@ func (w *worker) runLoop() error { // we get the data from txnCh.out until no more data here or reached the state that can be flushed. if txn.txnEvent != nil { needFlush = w.onEvent(txn) - if !needFlush { - for txn = range w.txnCh.Out() { - if txn.txnEvent == nil { - break - } + for !needFlush { + select { + case txn := <-w.txnCh.Out(): needFlush = w.onEvent(txn) - if needFlush { - break - } + default: + break } } } From 0ac4ff51a42959462c851b572b8c7db377f552c4 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 16:13:35 +0800 Subject: [PATCH 03/12] update --- cdc/sink/dmlsink/txn/worker.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 930e717c869..12f8f02f0a8 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -106,9 +106,6 @@ func (w *worker) runLoop() error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) - ticker := time.NewTicker(w.flushInterval) - defer ticker.Stop() - needFlush := false startToWork := time.Now() @@ -121,19 +118,23 @@ func (w *worker) runLoop() error { return nil case txn := <-w.txnCh.Out(): // we get the data from txnCh.out until no more data here or reached the state that can be flushed. + // If no more daa in txnCh.out, and also not reached the state that can be flushed, + // we will wait for 10ms and then do flush to avoid too much flush wish small amount of txns. if txn.txnEvent != nil { needFlush = w.onEvent(txn) - for !needFlush { - select { - case txn := <-w.txnCh.Out(): - needFlush = w.onEvent(txn) - default: - break + if !needFlush { + delay := time.After(w.flushInterval) + loop: + for !needFlush { + select { + case txn := <-w.txnCh.Out(): + needFlush = w.onEvent(txn) + case <-delay: + break loop + } } } } - case <-ticker.C: - needFlush = true } if needFlush { if err := w.doFlush(); err != nil { @@ -147,7 +148,7 @@ func (w *worker) runLoop() error { // we record total time to calcuate the worker busy ratio. // so we record the total time after flushing, to unified statistics on // flush time and total time - w.metricTxnWorkerTotalDuration.Observe(time.Now().Sub(startToWork).Seconds()) + w.metricTxnWorkerTotalDuration.Observe(time.Since(startToWork).Seconds()) startToWork = time.Now() } } From 482e75e4f57daf747492986f913bbbe62169c770 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 16:43:40 +0800 Subject: [PATCH 04/12] update --- cdc/sink/dmlsink/txn/worker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 12f8f02f0a8..aa18bc2ac37 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -117,9 +117,9 @@ func (w *worker) runLoop() error { zap.Int("workerID", w.ID)) return nil case txn := <-w.txnCh.Out(): - // we get the data from txnCh.out until no more data here or reached the state that can be flushed. - // If no more daa in txnCh.out, and also not reached the state that can be flushed, - // we will wait for 10ms and then do flush to avoid too much flush wish small amount of txns. + // we get the data from txnCh.out until no more data here or reach the state that can be flushed. + // If no more data in txnCh.out, and also not reach the state that can be flushed, + // we will wait for 10ms and then do flush to avoid too much flush with small amount of txns. if txn.txnEvent != nil { needFlush = w.onEvent(txn) if !needFlush { From ee5e30954cc2787f978dd21a751cbbcf8c3aaf67 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 17:29:45 +0800 Subject: [PATCH 05/12] Update cdc/sink/dmlsink/txn/worker.go Co-authored-by: dongmen <20351731+asddongmen@users.noreply.github.com> --- cdc/sink/dmlsink/txn/worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index aa18bc2ac37..374f308fb82 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -130,6 +130,7 @@ func (w *worker) runLoop() error { case txn := <-w.txnCh.Out(): needFlush = w.onEvent(txn) case <-delay: + needFlush = true break loop } } From 2cd9fb9c9d7ee187a1cc40d1f0a873f0983f84fb Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 17:31:36 +0800 Subject: [PATCH 06/12] update --- cdc/sink/dmlsink/txn/worker.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index aa18bc2ac37..3f417694ed3 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -124,13 +124,12 @@ func (w *worker) runLoop() error { needFlush = w.onEvent(txn) if !needFlush { delay := time.After(w.flushInterval) - loop: for !needFlush { select { case txn := <-w.txnCh.Out(): needFlush = w.onEvent(txn) case <-delay: - break loop + needFlush = true } } } From a32e5b009a61f0d4d7048ec28d68dfd120eb0441 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 17:47:59 +0800 Subject: [PATCH 07/12] update --- cdc/sink/dmlsink/txn/worker.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 3f417694ed3..c3d60d09bdb 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -107,7 +107,7 @@ func (w *worker) runLoop() error { zap.Int("workerID", w.ID)) needFlush := false - startToWork := time.Now() + startToBatching := time.Now() for { select { @@ -147,8 +147,8 @@ func (w *worker) runLoop() error { // we record total time to calcuate the worker busy ratio. // so we record the total time after flushing, to unified statistics on // flush time and total time - w.metricTxnWorkerTotalDuration.Observe(time.Since(startToWork).Seconds()) - startToWork = time.Now() + w.metricTxnWorkerTotalDuration.Observe(time.Since(startToBatching).Seconds()) + startToBatching = time.Now() } } } @@ -179,8 +179,7 @@ func (w *worker) doFlush() error { if w.hasPending { start := time.Now() defer func() { - elapsed := time.Since(start) - w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds()) + w.metricTxnWorkerFlushDuration.Observe(time.Since(start).Seconds()) }() if err := w.backend.Flush(w.ctx); err != nil { log.Warn("Transaction dmlSink backend flush fail", From c2bcf7afe4cb46b475cd0b00a6378461b67ec752 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 15 Jan 2024 18:04:24 +0800 Subject: [PATCH 08/12] change name --- cdc/sink/dmlsink/txn/txn_dml_sink.go | 2 +- cdc/sink/dmlsink/txn/worker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index b8f78059f5d..ce5ec8b9bdc 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -110,7 +110,7 @@ func newSink(ctx context.Context, g, ctx1 := errgroup.WithContext(ctx) for i, backend := range backends { w := newWorker(ctx1, changefeedID, i, backend, len(backends)) - g.Go(func() error { return w.runLoop() }) + g.Go(func() error { return w.run() }) sink.workers = append(sink.workers, w) } diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index c3d60d09bdb..c89685e847c 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -93,7 +93,7 @@ func (w *worker) close() { } // Continuously get events from txnCh and call backend flush based on conditions. -func (w *worker) runLoop() error { +func (w *worker) run() error { defer func() { if err := w.backend.Close(); err != nil { log.Info("Transaction dmlSink backend close fail", From e3face4a35118fc1d2ccd5107f9b57ba71e7e911 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 16 Jan 2024 14:28:10 +0800 Subject: [PATCH 09/12] change name --- cdc/sink/dmlsink/txn/worker.go | 36 +++++++++++++++++----------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index c89685e847c..ec48e8f0054 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -107,7 +107,7 @@ func (w *worker) run() error { zap.Int("workerID", w.ID)) needFlush := false - startToBatching := time.Now() + start := time.Now() for { select { @@ -123,33 +123,33 @@ func (w *worker) run() error { if txn.txnEvent != nil { needFlush = w.onEvent(txn) if !needFlush { - delay := time.After(w.flushInterval) + delayTicker := time.NewTicker(w.flushInterval) for !needFlush { select { case txn := <-w.txnCh.Out(): needFlush = w.onEvent(txn) - case <-delay: + case <-delayTicker.C: needFlush = true } } + delayTicker.Stop() } + // needFlush must be true here, so we can do flush. + if err := w.doFlush(); err != nil { + log.Error("Transaction dmlSink worker exits unexpectly", + zap.String("changefeedID", w.changefeed), + zap.Int("workerID", w.ID), + zap.Error(err)) + return err + } + needFlush = false + // we record total time to calcuate the worker busy ratio. + // so we record the total time after flushing, to unified statistics on + // flush time and total time + w.metricTxnWorkerTotalDuration.Observe(time.Since(start).Seconds()) + start = time.Now() } } - if needFlush { - if err := w.doFlush(); err != nil { - log.Error("Transaction dmlSink worker exits unexpectly", - zap.String("changefeedID", w.changefeed), - zap.Int("workerID", w.ID), - zap.Error(err)) - return err - } - needFlush = false - // we record total time to calcuate the worker busy ratio. - // so we record the total time after flushing, to unified statistics on - // flush time and total time - w.metricTxnWorkerTotalDuration.Observe(time.Since(startToBatching).Seconds()) - startToBatching = time.Now() - } } } From ac50340a3583905be879aa4fe4e108abdcc6e69e Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 16 Jan 2024 15:45:30 +0800 Subject: [PATCH 10/12] update --- cdc/sink/dmlsink/txn/worker.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index ec48e8f0054..bfee32c5fb4 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -123,16 +123,18 @@ func (w *worker) run() error { if txn.txnEvent != nil { needFlush = w.onEvent(txn) if !needFlush { - delayTicker := time.NewTicker(w.flushInterval) + delay := time.NewTimer(w.flushInterval) for !needFlush { select { case txn := <-w.txnCh.Out(): needFlush = w.onEvent(txn) - case <-delayTicker.C: + case <-delay.C: needFlush = true } } - delayTicker.Stop() + if !delay.Stop() { + <-delay.C + } } // needFlush must be true here, so we can do flush. if err := w.doFlush(); err != nil { From c56aaa2d8b7697d09b767fd7ef4a259d470594ed Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 16 Jan 2024 16:49:09 +0800 Subject: [PATCH 11/12] update --- cdc/sink/dmlsink/txn/worker.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index bfee32c5fb4..38820537917 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -132,8 +132,12 @@ func (w *worker) run() error { needFlush = true } } + //Release resources promptly if !delay.Stop() { - <-delay.C + select { + case <-delay.C: + default: + } } } // needFlush must be true here, so we can do flush. From 7d8bd0bba2d4e9c26c95c355741aec25969e7414 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 16 Jan 2024 18:01:53 +0800 Subject: [PATCH 12/12] update --- cdc/sink/dmlsink/txn/worker.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 38820537917..014ca2d234a 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -106,9 +106,7 @@ func (w *worker) run() error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) - needFlush := false start := time.Now() - for { select { case <-w.ctx.Done(): @@ -121,7 +119,7 @@ func (w *worker) run() error { // If no more data in txnCh.out, and also not reach the state that can be flushed, // we will wait for 10ms and then do flush to avoid too much flush with small amount of txns. if txn.txnEvent != nil { - needFlush = w.onEvent(txn) + needFlush := w.onEvent(txn) if !needFlush { delay := time.NewTimer(w.flushInterval) for !needFlush { @@ -132,7 +130,7 @@ func (w *worker) run() error { needFlush = true } } - //Release resources promptly + // Release resources promptly if !delay.Stop() { select { case <-delay.C: @@ -148,7 +146,6 @@ func (w *worker) run() error { zap.Error(err)) return err } - needFlush = false // we record total time to calcuate the worker busy ratio. // so we record the total time after flushing, to unified statistics on // flush time and total time