Skip to content

Commit

Permalink
Merge branch 'master' into fix-action
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 15, 2022
2 parents 8ac328f + 3f10ffa commit 3cb72d1
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 34 deletions.
7 changes: 4 additions & 3 deletions dm/syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,13 @@ func (ddl *DDLWorker) HandleQueryEvent(ev *replication.QueryEvent, ec eventConte

qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
ddl.logger.Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
ddl.logger.Warn("found error when getting sql_mode from binlog status_vars", zap.Error(err))
}

qec.timezone, err = event.GetTimezoneByStatusVars(ev.StatusVars, ddl.upstreamTZStr)
if err != nil {
ddl.logger.Warn("found error when get timezone from binlog status_vars", zap.Error(err))
// no timezone information retrieved and upstream timezone not previously set
if err != nil && ddl.upstreamTZStr == "" {
ddl.logger.Warn("found error when getting timezone from binlog status_vars", zap.Error(err))
}

qec.timestamp = ec.header.Timestamp
Expand Down
40 changes: 20 additions & 20 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,26 +1323,6 @@ func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan
return
}

// set timezone
if ddlJob.timezone != "" {
s.timezoneLastTime = ddlJob.timezone
setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", ddlJob.timezone)
ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...)
setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT"
ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault)
} else if s.timezoneLastTime != "" {
// use last time's time zone
setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", s.timezoneLastTime)
ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...)
setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT"
ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault)
}
// set timestamp
setTimestampSQL := fmt.Sprintf("SET TIMESTAMP = %d", ddlJob.timestamp)
ddlJob.ddls = append([]string{setTimestampSQL}, ddlJob.ddls...)
setTimestampSQLDefault := "SET TIMESTAMP = DEFAULT"
ddlJob.ddls = append(ddlJob.ddls, setTimestampSQLDefault)

// add this ddl ts beacause we start to exec this ddl.
s.updateReplicationJobTS(ddlJob, ddlJobIdx)

Expand Down Expand Up @@ -1377,6 +1357,26 @@ func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan
})

if !ignore {
// set timezone
if ddlJob.timezone != "" {
s.timezoneLastTime = ddlJob.timezone
setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", ddlJob.timezone)
ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...)
setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT"
ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault)
} else if s.timezoneLastTime != "" {
// use last time's time zone
setTimezoneSQL := fmt.Sprintf("SET SESSION TIME_ZONE = '%s'", s.timezoneLastTime)
ddlJob.ddls = append([]string{setTimezoneSQL}, ddlJob.ddls...)
setTimezoneSQLDefault := "SET SESSION TIME_ZONE = DEFAULT"
ddlJob.ddls = append(ddlJob.ddls, setTimezoneSQLDefault)
}
// set timestamp
setTimestampSQL := fmt.Sprintf("SET TIMESTAMP = %d", ddlJob.timestamp)
ddlJob.ddls = append([]string{setTimestampSQL}, ddlJob.ddls...)
setTimestampSQLDefault := "SET TIMESTAMP = DEFAULT"
ddlJob.ddls = append(ddlJob.ddls, setTimestampSQLDefault)

var affected int
var ddlCreateTime int64 = -1 // default when scan failed
row, err2 := db.QuerySQL(s.syncCtx, s.metricsProxies, "SELECT UNIX_TIMESTAMP()")
Expand Down
6 changes: 3 additions & 3 deletions engine/pkg/orm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,9 +713,9 @@ func (c *metaOpsClient) SetJobCanceling(ctx context.Context, jobID string) (Resu
}

// SetJobCanceled sets a cancelled status if a cancelling op exists.
// - If cancelling operation is not found, it can be triggered by unexpected
// SetJobCanceled don't make any change and return nil error.
// - If a job is already cancelled, don't make any change and return nil error.
// - If cancelling operation is not found, it can be triggered by unexpected
// SetJobCanceled don't make any change and return nil error.
// - If a job is already cancelled, don't make any change and return nil error.
func (c *metaOpsClient) SetJobCanceled(ctx context.Context, jobID string) (Result, error) {
result := &ormResult{}
ops := &model.JobOp{
Expand Down
8 changes: 4 additions & 4 deletions engine/servermaster/jobop/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func NewJobBackoff(jobID string, clocker clock.Clock, config *BackoffConfig) *Jo
// JobBackoff is a job backoff manager, it recoreds job online and offline events
// and determines whether a job can be re-created based on backoff mechanism.
// The backoff stragegy is as following
// - Each time a fail event arrives, the backoff time will be move forward by
// nextBackoff.
// - If a job is success for more than `resetInterval`, the backoff history will
// be cleared, and backoff time will be re-calculated.
// - Each time a fail event arrives, the backoff time will be move forward by
// nextBackoff.
// - If a job is success for more than `resetInterval`, the backoff history will
// be cleared, and backoff time will be re-calculated.
type JobBackoff struct {
jobID string
clocker clock.Clock
Expand Down
4 changes: 2 additions & 2 deletions metrics/alertmanager/ticdc.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ groups:
summary: cdc cluster has no owner for more than 10 minutes

- alert: cdc_checkpoint_high_delay
expr: ticdc_processor_checkpoint_ts_lag > 600
expr: ticdc_owner_checkpoint_ts_lag > 600
for: 1m
labels:
env: ENV_LABELS_ENV
level: critical
expr: ticdc_processor_checkpoint_ts_lag > 600
expr: ticdc_owner_checkpoint_ts_lag > 600
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
Expand Down
6 changes: 4 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
{
"datasource": "${DS_TEST-CLUSTER}",
"enable": true,
"expr": "max(ticdc_processor_checkpoint_ts_lag) by (changefeed, instance) > BOOL $spike_threshold",
"expr": "max(ticdc_owner_checkpoint_ts_lag) by (changefeed, instance) > BOOL $spike_threshold",
"hide": true,
"iconColor": "#F2495C",
"limit": 100,
Expand Down Expand Up @@ -3781,6 +3781,7 @@
"exemplar": true,
"expr": "max(ticdc_processor_checkpoint_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)",
"format": "time_series",
"hide": true,
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{instance}}-{{changefeed}}",
Expand Down Expand Up @@ -3893,6 +3894,7 @@
"exemplar": true,
"expr": "max(ticdc_processor_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)",
"format": "time_series",
"hide": true,
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{instance}}-{{changefeed}}",
Expand Down Expand Up @@ -22401,4 +22403,4 @@
"title": "Test-Cluster-TiCDC",
"uid": "YiGL8hBZ1",
"version": 43
}
}

0 comments on commit 3cb72d1

Please sign in to comment.