Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add event key output for debezium #11649

Merged
merged 17 commits into from
Oct 21, 2024
10 changes: 5 additions & 5 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,8 +1350,8 @@
return x.info
}

// Columns2ColumnDataForTest is for tests.
func Columns2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) {
// Columns2ColumnData converts column to column data and table info.
func Columns2ColumnData(columns []*Column) ([]*ColumnData, *TableInfo) {

Check warning on line 1354 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L1354

Added line #L1354 was not covered by tests
info := &TableInfo{
TableInfo: &model.TableInfo{
Columns: make([]*model.ColumnInfo, len(columns)),
Expand Down Expand Up @@ -1381,8 +1381,8 @@
return colDatas, info
}

// Column2ColumnDataXForTest is for tests.
func Column2ColumnDataXForTest(column *Column) ColumnDataX {
datas, info := Columns2ColumnDataForTest([]*Column{column})
// Column2ColumnDataX only use by debezium protocol.
func Column2ColumnDataX(column *Column) ColumnDataX {
datas, info := Columns2ColumnData([]*Column{column})

Check warning on line 1386 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L1385-L1386

Added lines #L1385 - L1386 were not covered by tests
return GetColumnDataX(datas[0], info)
}
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/txn/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestGenKeyListCaseInSensitive(t *testing.T) {
t.Parallel()

columns, tb := model.Columns2ColumnDataForTest([]*model.Column{
columns, tb := model.Columns2ColumnData([]*model.Column{
{
Value: "XyZ",
Type: mysql.TypeVarchar,
Expand All @@ -35,7 +35,7 @@ func TestGenKeyListCaseInSensitive(t *testing.T) {

first := genKeyList(columns, tb, 0, []int{0}, 1)

columns, tb = model.Columns2ColumnDataForTest([]*model.Column{
columns, tb = model.Columns2ColumnData([]*model.Column{
{
Value: "xYZ",
Type: mysql.TypeVarchar,
Expand Down
10 changes: 5 additions & 5 deletions cdc/sink/dmlsink/txn/mysql/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ func TestPrepareUpdate(t *testing.T) {
},
}
for _, tc := range testCases {
preDatas, info := model.Columns2ColumnDataForTest(tc.preCols)
datas, _ := model.Columns2ColumnDataForTest(tc.cols)
preDatas, info := model.Columns2ColumnData(tc.preCols)
datas, _ := model.Columns2ColumnData(tc.cols)
query, args := prepareUpdate(tc.quoteTable, preDatas, datas, info, false)
require.Equal(t, tc.expectedSQL, query)
require.Equal(t, tc.expectedArgs, args)
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestPrepareDelete(t *testing.T) {
},
}
for _, tc := range testCases {
preDatas, info := model.Columns2ColumnDataForTest(tc.preCols)
preDatas, info := model.Columns2ColumnData(tc.preCols)
query, args := prepareDelete(tc.quoteTable, preDatas, info, false)
require.Equal(t, tc.expectedSQL, query)
require.Equal(t, tc.expectedArgs, args)
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestWhereSlice(t *testing.T) {
},
}
for i, tc := range testCases {
datas, info := model.Columns2ColumnDataForTest(tc.cols)
datas, info := model.Columns2ColumnData(tc.cols)
colNames, args := whereSlice(datas, info, tc.forceReplicate)
require.Equal(t, tc.expectedColNames, colNames, "case %d fails", i)
require.Equal(t, tc.expectedArgs, args)
Expand Down Expand Up @@ -767,7 +767,7 @@ func TestMapReplace(t *testing.T) {
for _, tc := range testCases {
// multiple times to verify the stability of column sequence in query string
for i := 0; i < 10; i++ {
datas, info := model.Columns2ColumnDataForTest(tc.cols)
datas, info := model.Columns2ColumnData(tc.cols)
query, args := prepareReplace(tc.quoteTable, datas, info, false, false)
require.Equal(t, tc.expectedQuery, query)
require.Equal(t, tc.expectedArgs, args)
Expand Down
68 changes: 13 additions & 55 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,61 +448,19 @@ func mysqlTypeFromTiDBType(tidbType string) byte {
return result
}

const (
replacementChar = "_"
numberPrefix = "_"
)

// sanitizeName escapes not permitted chars for avro
// debezium-core/src/main/java/io/debezium/schema/FieldNameSelector.java
// https://avro.apache.org/docs/current/spec.html#names
func sanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && (c >= '0' && c <= '9') {
sb.WriteString(numberPrefix)
sb.WriteRune(c)
changed = true
} else if !(c == '_' ||
('a' <= c && c <= 'z') ||
('A' <= c && c <= 'Z') ||
('0' <= c && c <= '9')) {
sb.WriteString(replacementChar)
changed = true
} else {
sb.WriteRune(c)
}
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName
}

// sanitizeTopic escapes ".", it may have special meanings for sink connectors
func sanitizeTopic(name string) string {
return strings.ReplaceAll(name, ".", replacementChar)
}

// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func escapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option
return strings.ReplaceAll(name, ".", "_")
}

// <empty> | <name>[(<dot><name>)*]
func getAvroNamespace(namespace string, schema string) string {
return sanitizeName(namespace) + "." + sanitizeName(schema)
ns := common.SanitizeName(namespace)
s := common.SanitizeName(schema)
if s != "" {
return ns + "." + s
}
return ns
}

type avroSchema struct {
Expand Down Expand Up @@ -567,7 +525,7 @@ func (a *BatchEncoder) columns2AvroSchema(
) (*avroSchemaTop, error) {
top := &avroSchemaTop{
Tp: "record",
Name: sanitizeName(tableName.Table),
Name: common.SanitizeName(tableName.Table),
Namespace: getAvroNamespace(a.namespace, tableName.Schema),
Fields: nil,
}
Expand All @@ -580,7 +538,7 @@ func (a *BatchEncoder) columns2AvroSchema(
return nil, err
}
field := make(map[string]interface{})
field["name"] = sanitizeName(col.Name)
field["name"] = common.SanitizeName(col.Name)

copied := *col
copied.Value = copied.Default
Expand Down Expand Up @@ -679,9 +637,9 @@ func (a *BatchEncoder) columns2AvroData(

// https: //pkg.go.dev/github.com/linkedin/goavro/v2#Union
if col.Flag.IsNullable() {
ret[sanitizeName(col.Name)] = goavro.Union(str, data)
ret[common.SanitizeName(col.Name)] = goavro.Union(str, data)
} else {
ret[sanitizeName(col.Name)] = data
ret[common.SanitizeName(col.Name)] = data
}
}

Expand Down Expand Up @@ -790,7 +748,7 @@ func (a *BatchEncoder) columnToAvroSchema(
case mysql.TypeEnum, mysql.TypeSet:
es := make([]string, 0, len(ft.GetElems()))
for _, e := range ft.GetElems() {
e = escapeEnumAndSetOptions(e)
e = common.EscapeEnumAndSetOptions(e)
es = append(es, e)
}
return avroSchema{
Expand Down
14 changes: 10 additions & 4 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ func TestAvroEnvelope(t *testing.T) {
func TestSanitizeName(t *testing.T) {
t.Parallel()

require.Equal(t, "normalColumnName123", sanitizeName("normalColumnName123"))
require.Equal(t, "normalColumnName123", common.SanitizeName("normalColumnName123"))
require.Equal(
t,
"_1ColumnNameStartWithNumber",
sanitizeName("1ColumnNameStartWithNumber"),
common.SanitizeName("1ColumnNameStartWithNumber"),
)
require.Equal(t, "A_B", sanitizeName("A.B"))
require.Equal(t, "columnNameWith__", sanitizeName("columnNameWith中文"))
require.Equal(t, "A_B", common.SanitizeName("A.B"))
require.Equal(t, "columnNameWith______", common.SanitizeName("columnNameWith中文"))
}

func TestGetAvroNamespace(t *testing.T) {
Expand All @@ -335,6 +335,12 @@ func TestGetAvroNamespace(t *testing.T) {
"N_amespace.S_chema",
getAvroNamespace("N-amespace", "S.chema"),
)

require.Equal(
t,
"normalNamespace",
getAvroNamespace("normalNamespace", ""),
)
}

func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
Expand Down
92 changes: 92 additions & 0 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"database/sql"
"fmt"
"math"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
Expand Down Expand Up @@ -292,3 +293,94 @@
}
return bytes[pos:]
}

const (
replacementChar = "_"
numberPrefix = 'x'
)

// EscapeEnumAndSetOptions escapes ",", "\" and "”"
// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func EscapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option

Check warning on line 310 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L306-L310

Added lines #L306 - L310 were not covered by tests
}

func isValidFirstCharacter(c rune) bool {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_'

Check warning on line 314 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L313-L314

Added lines #L313 - L314 were not covered by tests
}

func isValidNonFirstCharacter(c rune) bool {
return isValidFirstCharacter(c) || (c >= '0' && c <= '9')

Check warning on line 318 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L317-L318

Added lines #L317 - L318 were not covered by tests
}

func isValidNonFirstCharacterForTopicName(c rune) bool {
return isValidNonFirstCharacter(c) || c == '.'

Check warning on line 322 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}

// SanitizeName escapes not permitted chars
// https://avro.apache.org/docs/1.12.0/specification/#names
// see https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/schema/SchemaNameAdjuster.java
func SanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && !isValidFirstCharacter(c) {
sb.WriteString(replacementChar)
if c >= '0' && c <= '9' {
sb.WriteRune(c)
}
changed = true
} else if !isValidNonFirstCharacter(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}

Check warning on line 346 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L328-L346

Added lines #L328 - L346 were not covered by tests
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName

Check warning on line 357 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L349-L357

Added lines #L349 - L357 were not covered by tests
}

// SanitizeTopicName escapes not permitted chars for topic name
// https://github.com/debezium/debezium/blob/main/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java
func SanitizeTopicName(name string) string {
changed := false
var sb strings.Builder
for _, c := range name {
if !isValidNonFirstCharacterForTopicName(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}

Check warning on line 374 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L362-L374

Added lines #L362 - L374 were not covered by tests
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Table name sanitize",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName

Check warning on line 385 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L377-L385

Added lines #L377 - L385 were not covered by tests
}
2 changes: 1 addition & 1 deletion pkg/sink/codec/csv/csv_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func TestConvertToCSVType(t *testing.T) {
for _, group := range csvTestColumnsGroup {
for _, c := range group {
cfg := &common.Config{BinaryEncodingMethod: c.BinaryEncodingMethod}
col := model.Column2ColumnDataXForTest(&c.col)
col := model.Column2ColumnDataX(&c.col)
val, _ := fromColValToCsvVal(cfg, col, c.colInfo.Ft)
require.Equal(t, c.want, val, c.col.Name)
}
Expand Down
Loading
Loading