Skip to content

Commit

Permalink
sql-mode(ticdc): dealing with SQL Mode more correctly (#10644)
Browse files Browse the repository at this point in the history
close #10660
  • Loading branch information
hongyunyan authored Feb 28, 2024
1 parent 1d3ea30 commit c178273
Show file tree
Hide file tree
Showing 25 changed files with 210 additions and 166 deletions.
3 changes: 0 additions & 3 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ type ReplicaConfig struct {
Scheduler *ChangefeedSchedulerConfig `json:"scheduler"`
Integrity *IntegrityConfig `json:"integrity"`
ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"`
SQLMode string `json:"sql_mode,omitempty"`
SyncedStatus *SyncedStatusConfig `json:"synced_status,omitempty"`
}

Expand All @@ -223,7 +222,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.EnableSyncPoint = c.EnableSyncPoint
res.EnableTableMonitor = c.EnableTableMonitor
res.IgnoreIneligibleTable = c.IgnoreIneligibleTable
res.SQLMode = c.SQLMode
if c.SyncPointInterval != nil {
res.SyncPointInterval = &c.SyncPointInterval.duration
}
Expand Down Expand Up @@ -523,7 +521,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableSyncPoint: cloned.EnableSyncPoint,
EnableTableMonitor: cloned.EnableTableMonitor,
BDRMode: cloned.BDRMode,
SQLMode: cloned.SQLMode,
}

if cloned.SyncPointInterval != nil {
Expand Down
1 change: 0 additions & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ var defaultAPIConfig = &ReplicaConfig{
},
ChangefeedErrorStuckDuration: &JSONDuration{*config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration},
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
SyncedStatus: (*SyncedStatusConfig)(config.GetDefaultReplicaConfig().SyncedStatus),
}

Expand Down
3 changes: 0 additions & 3 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.ChangefeedErrorStuckDuration == nil {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
if info.Config.SQLMode == "" {
info.Config.SQLMode = defaultConfig.SQLMode
}
if info.Config.SyncedStatus == nil {
info.Config.SyncedStatus = defaultConfig.SyncedStatus
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,8 @@ type DDLEvent struct {
IsBootstrap bool `msg:"-"`
// BDRRole is the role of the TiDB cluster, it is used to determine whether
// the DDL is executed by the primary cluster.
BDRRole string `msg:"-"`
BDRRole string `msg:"-"`
SQLMode mysql.SQLMode `msg:"-"`
}

// FromJob fills the values with DDLEvent from DDL job
Expand All @@ -1028,6 +1029,7 @@ func (d *DDLEvent) FromJobWithArgs(
d.Charset = job.Charset
d.Collate = job.Collate
d.BDRRole = job.BDRRole
d.SQLMode = job.SQLMode
switch d.Type {
// The query for "DROP TABLE" and "DROP VIEW" statements need
// to be rebuilt. The reason is elaborated as follows:
Expand Down
7 changes: 1 addition & 6 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/factory"
Expand Down Expand Up @@ -433,11 +432,7 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
// For example, it is needed to parse the following DDL query:
// `alter table "t" add column "c" int default 1;`
// by adding `ANSI_QUOTES` to the SQL mode.
mode, err := mysql.GetSQLMode(s.info.Config.SQLMode)
if err != nil {
return "", errors.Trace(err)
}
p.SetSQLMode(mode)
p.SetSQLMode(ddl.SQLMode)
stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate)
if err != nil {
return "", errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestWriteDDLEvent(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestAsyncExecAddIndex(t *testing.T) {
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/sink/dmlsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestAdjustSQLMode(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestNewMySQLBackendExecDML(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestNewMySQLBackend(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestNewMySQLBackendWithIPv6Address(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -779,7 +779,7 @@ func TestGBKSupported(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -839,7 +839,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) {

if dbIndex == 0 {
// test db
db, err := pmysql.MockTestDB(true)
db, err := pmysql.MockTestDB()
require.Nil(t, err)
return db, nil
}
Expand Down
22 changes: 19 additions & 3 deletions dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"strings"
"sync"

exprctx "github.com/pingcap/tidb/pkg/expression/context"
exprctximpl "github.com/pingcap/tidb/pkg/expression/contextimpl"
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -147,9 +150,14 @@ func UnpackTableID(id string) *filter.Table {
}
}

type exprCtxImpl struct {
*session
*exprctximpl.ExprCtxExtendedImpl
}
type session struct {
sessionctx.Context
vars *variable.SessionVars
exprctx *exprCtxImpl
values map[fmt.Stringer]interface{}
builtinFunctionUsage map[string]uint32
mu sync.RWMutex
Expand All @@ -160,6 +168,10 @@ func (se *session) GetSessionVars() *variable.SessionVars {
return se.vars
}

func (se *session) GetExprCtx() exprctx.BuildContext {
return se.exprctx
}

// SetValue implements the sessionctx.Context interface.
func (se *session) SetValue(key fmt.Stringer, value interface{}) {
se.mu.Lock()
Expand All @@ -176,7 +188,7 @@ func (se *session) Value(key fmt.Stringer) interface{} {
}

// GetInfoSchema implements the sessionctx.Context interface.
func (se *session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
func (se *session) GetInfoSchema() infoschema.InfoSchemaMetaVersion {
return nil
}

Expand All @@ -201,12 +213,16 @@ func NewSessionCtx(vars map[string]string) sessionctx.Context {
variables.TimeZone = loc
}
}

return &session{
sessionCtx := session{
vars: variables,
values: make(map[fmt.Stringer]interface{}, 1),
builtinFunctionUsage: make(map[string]uint32),
}
sessionCtx.exprctx = &exprCtxImpl{
session: &sessionCtx,
ExprCtxExtendedImpl: exprctximpl.NewExprExtendedImpl(&sessionCtx),
}
return &sessionCtx
}

// AdjustBinaryProtocolForDatum converts the data in binlog to TiDB datum.
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/expr_filter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func SkipDMLByExpression(ctx sessionctx.Context, row []interface{}, expr express
}
r := chunk.MutRowFromDatums(data).ToRow()

d, err := expr.Eval(ctx, r)
d, err := expr.Eval(ctx.GetExprCtx(), r)
if err != nil {
return false, err
}
Expand All @@ -198,7 +198,7 @@ func SkipDMLByExpression(ctx sessionctx.Context, row []interface{}, expr express
// getSimpleExprOfTable returns an expression of given `expr` string, using the table structure that is tracked before.
func getSimpleExprOfTable(ctx sessionctx.Context, expr string, ti *model.TableInfo, logger log.Logger) (expression.Expression, error) {
// TODO: use upstream timezone?
e, err := expression.ParseSimpleExprWithTableInfo(ctx, expr, ti)
e, err := expression.ParseSimpleExprWithTableInfo(ctx.GetExprCtx(), expr, ti)
if err != nil {
// if expression contains an unknown column, we return an expression that skips nothing
if plannererrors.ErrUnknownColumn.Equal(err) {
Expand Down
37 changes: 19 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.5.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
Expand All @@ -66,14 +66,14 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20240109063850-932639606bcf
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20240219052425-e3e0f7e1bc44
github.com/pingcap/tidb v1.1.0-beta.0.20240228123331-27ce02afd2e3
github.com/pingcap/tidb-dashboard v0.0.0-20240127080020-2171c6e6b9d7
github.com/pingcap/tidb-tools v0.0.0-20240202030925-a6014db89eb8
github.com/pingcap/tidb/pkg/parser v0.0.0-20240219043455-3ceeb3ff70bf
github.com/pingcap/tidb/pkg/parser v0.0.0-20240228123331-27ce02afd2e3
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/client_model v0.6.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/robfig/cron v1.2.0
Expand All @@ -89,9 +89,9 @@ require (
github.com/swaggo/swag v1.16.2
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20240205071126-11cb7985f0ec
github.com/tikv/client-go/v2 v2.0.8-0.20240223022444-8d28d3cd3a10
github.com/tikv/pd v1.1.0-beta.0.20231212061647-ab97b9a267f3
github.com/tikv/pd/client v0.0.0-20240126020320-567c7d43a008
github.com/tikv/pd/client v0.0.0-20240221051526-d6d9feab3e2a
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -109,17 +109,17 @@ require (
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/net v0.21.0
golang.org/x/oauth2 v0.17.0
golang.org/x/sync v0.6.0
golang.org/x/sys v0.17.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe
google.golang.org/grpc v1.61.0
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.32.0
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.4.5
Expand Down Expand Up @@ -167,13 +167,15 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/otiai10/copy v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/scalalang2/golang-fifo v0.1.5 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/tidwall/btree v1.7.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect
golang.org/x/arch v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/api v0.28.4 // indirect
k8s.io/apimachinery v0.28.4 // indirect
Expand All @@ -185,9 +187,9 @@ require (

require (
cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute v1.24.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/iam v1.1.6 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
Expand Down Expand Up @@ -264,7 +266,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/huichen/murmur v0.0.0-20130808212358-e0489551cf51 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/improbable-eng/grpc-web v0.12.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
Expand Down Expand Up @@ -373,7 +374,7 @@ require (
golang.org/x/mod v0.15.0 // indirect
golang.org/x/term v0.17.0
golang.org/x/tools v0.18.0 // indirect
google.golang.org/api v0.156.0 // indirect
google.golang.org/api v0.162.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
Expand Down
Loading

0 comments on commit c178273

Please sign in to comment.