Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: make table name as the prefix of the key (#588)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Apr 9, 2020
1 parent 13035b9 commit 562a126
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
12 changes: 6 additions & 6 deletions syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e
}

sql := fmt.Sprintf("%s INTO `%s`.`%s` (%s) VALUES (%s);", insertOrReplace, schema, table, columnList, columnPlaceholders)
ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns)
ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns, dbutil.TableName(schema, table))
sqls = append(sqls, sql)
values = append(values, value)
keys = append(keys, ks)
Expand Down Expand Up @@ -194,8 +194,8 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e
defaultIndexColumns = getAvailableIndexColumn(originalIndexColumns, oriOldValues)
}

ks := genMultipleKeys(originalColumns, oriOldValues, originalIndexColumns)
ks = append(ks, genMultipleKeys(originalColumns, oriChangedValues, originalIndexColumns)...)
ks := genMultipleKeys(originalColumns, oriOldValues, originalIndexColumns, dbutil.TableName(schema, table))
ks = append(ks, genMultipleKeys(originalColumns, oriChangedValues, originalIndexColumns, dbutil.TableName(schema, table))...)

if safeMode {
// generate delete sql from old data
Expand Down Expand Up @@ -267,7 +267,7 @@ func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e
if len(defaultIndexColumns) == 0 {
defaultIndexColumns = getAvailableIndexColumn(indexColumns, value)
}
ks := genMultipleKeys(columns, value, indexColumns)
ks := genMultipleKeys(columns, value, indexColumns, dbutil.TableName(schema, table))

sql, value := genDeleteSQL(schema, table, value, columns, defaultIndexColumns)
sqls = append(sqls, sql)
Expand Down Expand Up @@ -422,11 +422,11 @@ func genKeyList(columns []*column, dataSeq []interface{}) string {
return strings.Join(values, ",")
}

func genMultipleKeys(columns []*column, value []interface{}, indexColumns map[string][]*column) []string {
func genMultipleKeys(columns []*column, value []interface{}, indexColumns map[string][]*column, table string) []string {
multipleKeys := make([]string, 0, len(indexColumns))
for _, indexCols := range indexColumns {
cols, vals := getColumnData(columns, indexCols, value)
multipleKeys = append(multipleKeys, genKeyList(cols, vals))
multipleKeys = append(multipleKeys, table+genKeyList(cols, vals))
}
return multipleKeys
}
Expand Down
2 changes: 1 addition & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func (s *Syncer) resolveCasuality(keys []string) (string, error) {
}

if s.c.detectConflict(keys) {
s.tctx.L().Debug("meet causality key, will generate a flush job and wait all sqls executed", zap.String("feature", "conflict detect"))
s.tctx.L().Debug("meet causality key, will generate a flush job and wait all sqls executed", zap.Strings("keys", keys))
if err := s.flushJobs(); err != nil {
return "", err
}
Expand Down

0 comments on commit 562a126

Please sign in to comment.