Skip to content

Commit

Permalink
syncer(dm) : fix default collation with upstream in create table stat…
Browse files Browse the repository at this point in the history
…ement (pingcap#3575)
  • Loading branch information
WizardXiao authored and okJiang committed Dec 8, 2021
1 parent ffe3b15 commit 64a2f20
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 54 deletions.
18 changes: 18 additions & 0 deletions dm/pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,24 @@ func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) {
return parser2, err
}

// GetServerCollationByStatusVars gets server collation by binlog statusVars.
func GetServerCollationByStatusVars(statusVars []byte) (string, error) {
vars, err := statusVarsToKV(statusVars)
b, ok := vars[QCharsetCode]

if !ok {
if err == nil {
// only happen when this is a dummy event generated by DM
err = fmt.Errorf("Q_CHARSET_CODE not found in status_vars %v", statusVars)
}
// mysql default 'latin1_swedish_ci'
return "latin1_swedish_ci", err
}
// QCharsetCode 2-byte character_set_client + 2-byte collation_connection + 2-byte collation_server
// collation is less than 255 and we use the first byte.
return mysql.Collations[b[4]], err
}

// if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success
// returned map should not be nil for other usage.
func statusVarsToKV(statusVars []byte) (map[byte][]byte, error) {
Expand Down
8 changes: 8 additions & 0 deletions dm/pkg/binlog/event/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func (t *testUtilSuite) TestStatusVarsToKV(c *C) {
},
nil,
},
// only Q_CHARSET_CODE
{
[]byte{4, 33, 0, 33, 0, 8, 0},
map[byte][]byte{
4: {33, 0, 33, 0, 8, 0},
},
nil,
},
// copied from a integration test
{
[]byte{0, 0, 0, 0, 0, 1, 4, 0, 8, 0, 0, 0, 0, 0, 6, 3, 115, 116, 100, 4, 33, 0, 33, 0, 8, 0, 12, 1, 97, 108, 108, 95, 109, 111, 100, 101, 0},
Expand Down
45 changes: 45 additions & 0 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,51 @@ func GetServerUnixTS(ctx context.Context, db *sql.DB) (int64, error) {
return ts, err
}

// GetCharsetAndDefaultCollation gets charset and default collation map.
func GetCharsetAndDefaultCollation(ctx context.Context, db *sql.DB) (map[string]string, error) {
charsetAndDefaultCollation := make(map[string]string)
conn, err := db.Conn(ctx)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer conn.Close()

// Show an example.
/*
mysql> SHOW CHARACTER SET;
+----------+---------------------------------+---------------------+--------+
| Charset | Description | Default collation | Maxlen |
+----------+---------------------------------+---------------------+--------+
| armscii8 | ARMSCII-8 Armenian | armscii8_general_ci | 1 |
| ascii | US ASCII | ascii_general_ci | 1 |
| big5 | Big5 Traditional Chinese | big5_chinese_ci | 2 |
| binary | Binary pseudo charset | binary | 1 |
| cp1250 | Windows Central European | cp1250_general_ci | 1 |
| cp1251 | Windows Cyrillic | cp1251_general_ci | 1 |
+----------+---------------------------------+---------------------+--------+
*/

rows, err := conn.QueryContext(ctx, "SHOW CHARACTER SET")
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

defer rows.Close()
for rows.Next() {
var charset, description, collation string
var maxlen int
if scanErr := rows.Scan(&charset, &description, &collation, &maxlen); scanErr != nil {
return nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError)
}
charsetAndDefaultCollation[strings.ToLower(charset)] = collation
}

if err = rows.Close(); err != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
}
return charsetAndDefaultCollation, err
}

// GetSchemaList gets db schema list with `SHOW DATABASES`.
func GetSchemaList(ctx context.Context, db *sql.DB) ([]string, error) {
schemaList := []string{}
Expand Down
5 changes: 5 additions & 0 deletions dm/syncer/dbconn/upstream_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (conn *UpStreamConn) GetServerUnixTS(ctx context.Context) (int64, error) {
return utils.GetServerUnixTS(ctx, conn.BaseDB.DB)
}

// GetCharsetAndDefaultCollation returns charset and default collation map.
func (conn *UpStreamConn) GetCharsetAndDefaultCollation(ctx context.Context) (map[string]string, error) {
return utils.GetCharsetAndDefaultCollation(ctx, conn.BaseDB.DB)
}

// GetParser returns the parser with correct flag for upstream.
func (conn *UpStreamConn) GetParser(ctx context.Context) (*parser.Parser, error) {
return utils.GetParser(ctx, conn.BaseDB.DB)
Expand Down
83 changes: 74 additions & 9 deletions dm/syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"time"

"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"go.uber.org/zap"

"github.com/pingcap/ticdc/dm/pkg/binlog/event"
tcontext "github.com/pingcap/ticdc/dm/pkg/context"
parserpkg "github.com/pingcap/ticdc/dm/pkg/parser"
"github.com/pingcap/ticdc/dm/pkg/terror"
Expand Down Expand Up @@ -52,7 +52,7 @@ func parseOneStmt(qec *queryEventContext) (stmt ast.StmtNode, err error) {
// 3. apply online ddl if onlineDDL is not nil:
// * specially, if skip, apply empty string;
func (s *Syncer) processOneDDL(qec *queryEventContext, sql string) ([]string, error) {
ddlInfo, err := s.genDDLInfo(qec.p, qec.ddlSchema, sql)
ddlInfo, err := s.genDDLInfo(qec, sql)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,14 +102,14 @@ func (s *Syncer) processOneDDL(qec *queryEventContext, sql string) ([]string, er
}

// genDDLInfo generates ddl info by given sql.
func (s *Syncer) genDDLInfo(p *parser.Parser, schema, sql string) (*ddlInfo, error) {
func (s *Syncer) genDDLInfo(qec *queryEventContext, sql string) (*ddlInfo, error) {
s.tctx.L().Debug("begin generate ddl info", zap.String("event", "query"), zap.String("statement", sql))
stmt, err := p.ParseOneStmt(sql, "", "")
stmt, err := qec.p.ParseOneStmt(sql, "", "")
if err != nil {
return nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql)
}

sourceTables, err := parserpkg.FetchDDLTables(schema, stmt, s.SourceTableNamesFlavor)
sourceTables, err := parserpkg.FetchDDLTables(qec.ddlSchema, stmt, s.SourceTableNamesFlavor)
if err != nil {
return nil, err
}
Expand All @@ -120,14 +120,17 @@ func (s *Syncer) genDDLInfo(p *parser.Parser, schema, sql string) (*ddlInfo, err
targetTables = append(targetTables, renamedTable)
}

routedDDL, err := parserpkg.RenameDDLTable(stmt, targetTables)
return &ddlInfo{
ddlInfo := &ddlInfo{
originDDL: sql,
routedDDL: routedDDL,
originStmt: stmt,
sourceTables: sourceTables,
targetTables: targetTables,
}, err
}

adjustCollation(s.tctx, ddlInfo, qec.eventStatusVars, s.charsetAndDefaultCollation)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables)
ddlInfo.routedDDL = routedDDL
return ddlInfo, err
}

func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema string) error {
Expand Down Expand Up @@ -192,6 +195,68 @@ func (s *Syncer) clearOnlineDDL(tctx *tcontext.Context, targetTable *filter.Tabl
return nil
}

// adjustCollation adds collation for create database and check create table.
func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte, charsetAndDefaultCollationMap map[string]string) {
switch createStmt := ddlInfo.originStmt.(type) {
case *ast.CreateTableStmt:
if createStmt.ReferTable != nil {
return
}
var justCharset string
for _, tableOption := range createStmt.Options {
// already have 'Collation'
if tableOption.Tp == ast.TableOptionCollate {
return
}
if tableOption.Tp == ast.TableOptionCharset {
justCharset = tableOption.StrValue
}
}
if justCharset == "" {
tctx.L().Warn("detect create table risk which use implicit charset and collation", zap.String("originSQL", ddlInfo.originDDL))
return
}
// just has charset, can add collation by charset and default collation map
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(justCharset)]
if !ok {
tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset)))
return
}
tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation})

case *ast.CreateDatabaseStmt:
var justCharset, collation string
var ok bool
for _, createOption := range createStmt.Options {
// already have 'Collation'
if createOption.Tp == ast.DatabaseOptionCollate {
return
}
if createOption.Tp == ast.DatabaseOptionCharset {
justCharset = createOption.Value
}
}

// just has charset, can add collation by charset and default collation map
if justCharset != "" {
collation, ok = charsetAndDefaultCollationMap[strings.ToLower(justCharset)]
if !ok {
tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset)))
return
}
tctx.L().Info("detect create database risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
} else {
// has no charset and collation
// add collation by server collation from binlog statusVars
collation, _ = event.GetServerCollationByStatusVars(statusVars)
// add collation
tctx.L().Info("detect create database risk which use implicit charset and collation, we will add collation by binlog status_vars", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
}
createStmt.Options = append(createStmt.Options, &ast.DatabaseOption{Tp: ast.DatabaseOptionCollate, Value: collation})
}
}

type ddlInfo struct {
originDDL string
routedDDL string
Expand Down
71 changes: 62 additions & 9 deletions dm/syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) {
}

targetSQLs := [][]string{
{"CREATE DATABASE IF NOT EXISTS `xs1`"},
{"CREATE DATABASE IF NOT EXISTS `xs1`"},
{"CREATE DATABASE IF NOT EXISTS `xs1` COLLATE = utf8mb4_bin"},
{"CREATE DATABASE IF NOT EXISTS `xs1` COLLATE = utf8mb4_bin"},
{"DROP DATABASE IF EXISTS `xs1`"},
{"DROP DATABASE IF EXISTS `xs1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
Expand Down Expand Up @@ -233,14 +233,15 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) {
ec := &eventContext{
tctx: tctx,
}

statusVars := []byte{4, 0, 0, 0, 0, 46, 0}
for i, sql := range sqls {
qec := &queryEventContext{
eventContext: ec,
ddlSchema: "test",
originSQL: sql,
appliedDDLs: make([]string, 0),
p: parser.New(),
eventContext: ec,
ddlSchema: "test",
originSQL: sql,
appliedDDLs: make([]string, 0),
p: parser.New(),
eventStatusVars: statusVars,
}
stmt, err := parseOneStmt(qec)
c.Assert(err, IsNil)
Expand All @@ -260,7 +261,7 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) {
c.Assert(qec.appliedDDLs, DeepEquals, expectedSQLs[i])
c.Assert(targetSQLs[i], HasLen, len(qec.appliedDDLs))
for j, sql2 := range qec.appliedDDLs {
ddlInfo, err2 := syncer.genDDLInfo(qec.p, qec.ddlSchema, sql2)
ddlInfo, err2 := syncer.genDDLInfo(qec, sql2)
c.Assert(err2, IsNil)
c.Assert(targetSQLs[i][j], Equals, ddlInfo.routedDDL)
}
Expand Down Expand Up @@ -637,6 +638,58 @@ func (s *testDDLSuite) TestClearOnlineDDL(c *C) {
c.Assert(mock.toFinish, HasLen, 0)
}

func (s *testDDLSuite) TestAdjustCollation(c *C) {
// duplicate with pkg/parser
sqls := []string{
"create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int) CHARSET=utf8mb4",
"create table `test`.`t1` (id int) COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int)",
"create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create database `test` CHARACTER SET=utf8mb4",
"create database `test` COLLATE=utf8mb4_general_ci",
"create database if not exists `test`",
}

expectedSQLs := []string{
"CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT) DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT)",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` COLLATE = utf8mb4_general_ci",
"CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin",
}

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustTableCollation")))
syncer := NewSyncer(&config.SubTaskConfig{}, nil, nil)
syncer.tctx = tctx
p := parser.New()
tab := &filter.Table{
Schema: "test",
Name: "t",
}
statusVars := []byte{4, 0, 0, 0, 0, 46, 0}
charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"}
for i, sql := range sqls {
ddlInfo := &ddlInfo{
originDDL: sql,
routedDDL: sql,
sourceTables: []*filter.Table{tab},
targetTables: []*filter.Table{tab},
}
stmt, err := p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
c.Assert(stmt, NotNil)
ddlInfo.originStmt = stmt
adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables)
c.Assert(err, IsNil)
c.Assert(routedDDL, Equals, expectedSQLs[i])
}
}

type mockOnlinePlugin struct {
toFinish map[string]struct{}
}
Expand Down
12 changes: 7 additions & 5 deletions dm/syncer/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,14 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) {
},
}
p := parser.New()
qec := &queryEventContext{
eventContext: &eventContext{tctx: tcontext.Background()},
p: p,
}

for _, ca := range cases {
ddlInfo, err := syncer.genDDLInfo(p, ca.schema, ca.sql)
qec := &queryEventContext{
eventContext: &eventContext{tctx: tcontext.Background()},
p: p,
ddlSchema: ca.schema,
}
ddlInfo, err := syncer.genDDLInfo(qec, ca.sql)
c.Assert(err, IsNil)
qec.ddlSchema = ca.schema
qec.originSQL = ca.sql
Expand Down
Loading

0 comments on commit 64a2f20

Please sign in to comment.