Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add event key output for debezium #11649

Merged
merged 17 commits into from
Oct 21, 2024
25 changes: 25 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,31 @@
return pkeyCols, pkeyColInfos
}

// HandleKeyColDataXInfos returns the columnDataX(s) and colInfo(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColDataXInfos() ([]ColumnDataX, []rowcodec.ColInfo) {
pkeyColDataXs := make([]ColumnDataX, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

var cols []*ColumnData
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

Check warning on line 593 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L584-L593

Added lines #L584 - L593 were not covered by tests

tableInfo := r.TableInfo
colInfos := tableInfo.GetColInfosForRowChangedEvent()
for i, col := range cols {
if col != nil && tableInfo.ForceGetColumnFlagType(col.ColumnID).IsHandleKey() {
pkeyColDataXs = append(pkeyColDataXs, GetColumnDataX(col, tableInfo))
pkeyColInfos = append(pkeyColInfos, colInfos[i])
}

Check warning on line 601 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L595-L601

Added lines #L595 - L601 were not covered by tests
}

// It is okay not to have handle keys, so the empty array is an acceptable result
return pkeyColDataXs, pkeyColInfos

Check warning on line 605 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L605

Added line #L605 was not covered by tests
}

// ApproximateBytes returns approximate bytes in memory consumed by the event.
func (r *RowChangedEvent) ApproximateBytes() int {
const sizeOfRowEvent = int(unsafe.Sizeof(*r))
Expand Down
68 changes: 13 additions & 55 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,61 +448,19 @@ func mysqlTypeFromTiDBType(tidbType string) byte {
return result
}

const (
replacementChar = "_"
numberPrefix = "_"
)

// sanitizeName escapes not permitted chars for avro
// debezium-core/src/main/java/io/debezium/schema/FieldNameSelector.java
// https://avro.apache.org/docs/current/spec.html#names
func sanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && (c >= '0' && c <= '9') {
sb.WriteString(numberPrefix)
sb.WriteRune(c)
changed = true
} else if !(c == '_' ||
('a' <= c && c <= 'z') ||
('A' <= c && c <= 'Z') ||
('0' <= c && c <= '9')) {
sb.WriteString(replacementChar)
changed = true
} else {
sb.WriteRune(c)
}
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName
}

// sanitizeTopic escapes ".", it may have special meanings for sink connectors
func sanitizeTopic(name string) string {
return strings.ReplaceAll(name, ".", replacementChar)
}

// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func escapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option
return strings.ReplaceAll(name, ".", "_")
}

// <empty> | <name>[(<dot><name>)*]
func getAvroNamespace(namespace string, schema string) string {
return sanitizeName(namespace) + "." + sanitizeName(schema)
ns := common.SanitizeName(namespace)
s := common.SanitizeName(schema)
if s != "" {
return ns + "." + s
}
return ns
}

type avroSchema struct {
Expand Down Expand Up @@ -567,7 +525,7 @@ func (a *BatchEncoder) columns2AvroSchema(
) (*avroSchemaTop, error) {
top := &avroSchemaTop{
Tp: "record",
Name: sanitizeName(tableName.Table),
Name: common.SanitizeName(tableName.Table),
Namespace: getAvroNamespace(a.namespace, tableName.Schema),
Fields: nil,
}
Expand All @@ -580,7 +538,7 @@ func (a *BatchEncoder) columns2AvroSchema(
return nil, err
}
field := make(map[string]interface{})
field["name"] = sanitizeName(col.Name)
field["name"] = common.SanitizeName(col.Name)

copied := *col
copied.Value = copied.Default
Expand Down Expand Up @@ -679,9 +637,9 @@ func (a *BatchEncoder) columns2AvroData(

// https: //pkg.go.dev/github.com/linkedin/goavro/v2#Union
if col.Flag.IsNullable() {
ret[sanitizeName(col.Name)] = goavro.Union(str, data)
ret[common.SanitizeName(col.Name)] = goavro.Union(str, data)
} else {
ret[sanitizeName(col.Name)] = data
ret[common.SanitizeName(col.Name)] = data
}
}

Expand Down Expand Up @@ -790,7 +748,7 @@ func (a *BatchEncoder) columnToAvroSchema(
case mysql.TypeEnum, mysql.TypeSet:
es := make([]string, 0, len(ft.GetElems()))
for _, e := range ft.GetElems() {
e = escapeEnumAndSetOptions(e)
e = common.EscapeEnumAndSetOptions(e)
es = append(es, e)
}
return avroSchema{
Expand Down
14 changes: 10 additions & 4 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ func TestAvroEnvelope(t *testing.T) {
func TestSanitizeName(t *testing.T) {
t.Parallel()

require.Equal(t, "normalColumnName123", sanitizeName("normalColumnName123"))
require.Equal(t, "normalColumnName123", common.SanitizeName("normalColumnName123"))
require.Equal(
t,
"_1ColumnNameStartWithNumber",
sanitizeName("1ColumnNameStartWithNumber"),
common.SanitizeName("1ColumnNameStartWithNumber"),
)
require.Equal(t, "A_B", sanitizeName("A.B"))
require.Equal(t, "columnNameWith__", sanitizeName("columnNameWith中文"))
require.Equal(t, "A_B", common.SanitizeName("A.B"))
require.Equal(t, "columnNameWith______", common.SanitizeName("columnNameWith中文"))
}

func TestGetAvroNamespace(t *testing.T) {
Expand All @@ -335,6 +335,12 @@ func TestGetAvroNamespace(t *testing.T) {
"N_amespace.S_chema",
getAvroNamespace("N-amespace", "S.chema"),
)

require.Equal(
t,
"normalNamespace",
getAvroNamespace("normalNamespace", ""),
)
}

func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
Expand Down
92 changes: 92 additions & 0 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"database/sql"
"fmt"
"math"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
Expand Down Expand Up @@ -292,3 +293,94 @@
}
return bytes[pos:]
}

const (
replacementChar = "_"
numberPrefix = 'x'
)

// EscapeEnumAndSetOptions escapes ",", "\" and "”"
// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func EscapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option

Check warning on line 310 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L306-L310

Added lines #L306 - L310 were not covered by tests
}

func isValidFirstCharacter(c rune) bool {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_'

Check warning on line 314 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L313-L314

Added lines #L313 - L314 were not covered by tests
}

func isValidNonFirstCharacter(c rune) bool {
return isValidFirstCharacter(c) || (c >= '0' && c <= '9')

Check warning on line 318 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L317-L318

Added lines #L317 - L318 were not covered by tests
}

func isValidNonFirstCharacterForTopicName(c rune) bool {
return isValidNonFirstCharacter(c) || c == '.'

Check warning on line 322 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}

// SanitizeName escapes not permitted chars
// https://avro.apache.org/docs/1.12.0/specification/#names
// see https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/schema/SchemaNameAdjuster.java
func SanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && !isValidFirstCharacter(c) {
sb.WriteString(replacementChar)
if c >= '0' && c <= '9' {
sb.WriteRune(c)
}
changed = true
} else if !isValidNonFirstCharacter(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}

Check warning on line 346 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L328-L346

Added lines #L328 - L346 were not covered by tests
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName

Check warning on line 357 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L349-L357

Added lines #L349 - L357 were not covered by tests
}

// SanitizeTopicName escapes not permitted chars for topic name
// https://github.com/debezium/debezium/blob/main/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java
func SanitizeTopicName(name string) string {
changed := false
var sb strings.Builder
for _, c := range name {
if !isValidNonFirstCharacterForTopicName(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}

Check warning on line 374 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L362-L374

Added lines #L362 - L374 were not covered by tests
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Table name sanitize",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName

Check warning on line 385 in pkg/sink/codec/common/helper.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/codec/common/helper.go#L377-L385

Added lines #L377 - L385 were not covered by tests
}
Loading
Loading