From cf4c080db3dbcd978d150aaf5e1efbe09f299abe Mon Sep 17 00:00:00 2001 From: Luo Yangzhixin Date: Wed, 14 Sep 2022 17:13:00 +0800 Subject: [PATCH 1/2] syncer(dm): Fix follow-up bugs after fixing nondeterministic DDL (#7036) close pingcap/tiflow#6628 --- dm/syncer/ddl.go | 7 ++--- dm/syncer/syncer.go | 40 ++++++++++++++-------------- engine/pkg/orm/client.go | 6 ++--- engine/servermaster/jobop/backoff.go | 8 +++--- 4 files changed, 31 insertions(+), 30 deletions(-) diff --git a/dm/syncer/ddl.go b/dm/syncer/ddl.go index 7c68fc5c251..ee524895d40 100644 --- a/dm/syncer/ddl.go +++ b/dm/syncer/ddl.go @@ -255,12 +255,13 @@ func (ddl *DDLWorker) HandleQueryEvent(ev *replication.QueryEvent, ec eventConte qec.p, err = event.GetParserForStatusVars(ev.StatusVars) if err != nil { - ddl.logger.Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) + ddl.logger.Warn("found error when getting sql_mode from binlog status_vars", zap.Error(err)) } qec.timezone, err = event.GetTimezoneByStatusVars(ev.StatusVars, ddl.upstreamTZStr) - if err != nil { - ddl.logger.Warn("found error when get timezone from binlog status_vars", zap.Error(err)) + // no timezone information retrieved and upstream timezone not previously set + if err != nil && ddl.upstreamTZStr == "" { + ddl.logger.Warn("found error when getting timezone from binlog status_vars", zap.Error(err)) } qec.timestamp = ec.header.Timestamp diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index d474396d483..9536e8b9636 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1323,26 +1323,6 @@ func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan return } - // set timezone - if ddlJob.timezone != "" { - s.timezoneLastTime = ddlJob.timezone - setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", ddlJob.timezone) - ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...) - setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT" - ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault) - } else if s.timezoneLastTime != "" { - // use last time's time zone - setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", s.timezoneLastTime) - ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...) - setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT" - ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault) - } - // set timestamp - setTimestampSQL := fmt.Sprintf("SET TIMESTAMP = %d", ddlJob.timestamp) - ddlJob.ddls = append([]string{setTimestampSQL}, ddlJob.ddls...) - setTimestampSQLDefault := "SET TIMESTAMP = DEFAULT" - ddlJob.ddls = append(ddlJob.ddls, setTimestampSQLDefault) - // add this ddl ts beacause we start to exec this ddl. s.updateReplicationJobTS(ddlJob, ddlJobIdx) @@ -1377,6 +1357,26 @@ func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan }) if !ignore { + // set timezone + if ddlJob.timezone != "" { + s.timezoneLastTime = ddlJob.timezone + setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", ddlJob.timezone) + ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...) + setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT" + ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault) + } else if s.timezoneLastTime != "" { + // use last time's time zone + setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", s.timezoneLastTime) + ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...) + setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT" + ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault) + } + // set timestamp + setTimestampSQL := fmt.Sprintf("SET TIMESTAMP = %d", ddlJob.timestamp) + ddlJob.ddls = append([]string{setTimestampSQL}, ddlJob.ddls...) + setTimestampSQLDefault := "SET TIMESTAMP = DEFAULT" + ddlJob.ddls = append(ddlJob.ddls, setTimestampSQLDefault) + var affected int var ddlCreateTime int64 = -1 // default when scan failed row, err2 := db.QuerySQL(s.syncCtx, s.metricsProxies, "SELECT UNIX_TIMESTAMP()") diff --git a/engine/pkg/orm/client.go b/engine/pkg/orm/client.go index ca9ceea52ad..ed3d011616c 100644 --- a/engine/pkg/orm/client.go +++ b/engine/pkg/orm/client.go @@ -713,9 +713,9 @@ func (c *metaOpsClient) SetJobCanceling(ctx context.Context, jobID string) (Resu } // SetJobCanceled sets a cancelled status if a cancelling op exists. -// - If cancelling operation is not found, it can be triggered by unexpected -// SetJobCanceled don't make any change and return nil error. -// - If a job is already cancelled, don't make any change and return nil error. +// - If cancelling operation is not found, it can be triggered by unexpected +// SetJobCanceled don't make any change and return nil error. +// - If a job is already cancelled, don't make any change and return nil error. func (c *metaOpsClient) SetJobCanceled(ctx context.Context, jobID string) (Result, error) { result := &ormResult{} ops := &model.JobOp{ diff --git a/engine/servermaster/jobop/backoff.go b/engine/servermaster/jobop/backoff.go index bd407d5e582..f17021a6e90 100644 --- a/engine/servermaster/jobop/backoff.go +++ b/engine/servermaster/jobop/backoff.go @@ -53,10 +53,10 @@ func NewJobBackoff(jobID string, clocker clock.Clock, config *BackoffConfig) *Jo // JobBackoff is a job backoff manager, it recoreds job online and offline events // and determines whether a job can be re-created based on backoff mechanism. // The backoff stragegy is as following -// - Each time a fail event arrives, the backoff time will be move forward by -// nextBackoff. -// - If a job is success for more than `resetInterval`, the backoff history will -// be cleared, and backoff time will be re-calculated. +// - Each time a fail event arrives, the backoff time will be move forward by +// nextBackoff. +// - If a job is success for more than `resetInterval`, the backoff history will +// be cleared, and backoff time will be re-calculated. type JobBackoff struct { jobID string clocker clock.Clock From 3f10ffac77c4df466d41e3f672b77c1afc894838 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 14 Sep 2022 23:54:59 +0800 Subject: [PATCH 2/2] metrics: replace processor checkpoint with owner checkpoint (#7051) close pingcap/tiflow#7050 --- metrics/alertmanager/ticdc.rules.yml | 4 ++-- metrics/grafana/ticdc.json | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/metrics/alertmanager/ticdc.rules.yml b/metrics/alertmanager/ticdc.rules.yml index af3cd9481f8..2462c71f878 100644 --- a/metrics/alertmanager/ticdc.rules.yml +++ b/metrics/alertmanager/ticdc.rules.yml @@ -26,12 +26,12 @@ groups: summary: cdc cluster has no owner for more than 10 minutes - alert: cdc_checkpoint_high_delay - expr: ticdc_processor_checkpoint_ts_lag > 600 + expr: ticdc_owner_checkpoint_ts_lag > 600 for: 1m labels: env: ENV_LABELS_ENV level: critical - expr: ticdc_processor_checkpoint_ts_lag > 600 + expr: ticdc_owner_checkpoint_ts_lag > 600 annotations: description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' value: '{{ $value }}' diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 8911af7f50e..b452002828d 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -62,7 +62,7 @@ { "datasource": "${DS_TEST-CLUSTER}", "enable": true, - "expr": "max(ticdc_processor_checkpoint_ts_lag) by (changefeed, instance) > BOOL $spike_threshold", + "expr": "max(ticdc_owner_checkpoint_ts_lag) by (changefeed, instance) > BOOL $spike_threshold", "hide": true, "iconColor": "#F2495C", "limit": 100, @@ -3781,6 +3781,7 @@ "exemplar": true, "expr": "max(ticdc_processor_checkpoint_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", "format": "time_series", + "hide": true, "interval": "", "intervalFactor": 1, "legendFormat": "{{instance}}-{{changefeed}}", @@ -3893,6 +3894,7 @@ "exemplar": true, "expr": "max(ticdc_processor_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", "format": "time_series", + "hide": true, "interval": "", "intervalFactor": 1, "legendFormat": "{{instance}}-{{changefeed}}", @@ -22401,4 +22403,4 @@ "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", "version": 43 -} +} \ No newline at end of file