From 6a019f54be084239d0f017263f8dbf8dfac6e4cc Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 5 Feb 2025 21:25:00 +0800 Subject: [PATCH 1/6] update --- .../fail_over_ddl_mix/run.sh | 79 ++++++++++++++++++- .../partition_table/data/prepare.sql | 1 + 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/fail_over_ddl_mix/run.sh b/tests/integration_tests/fail_over_ddl_mix/run.sh index 12d1d8246..4c778e3a6 100644 --- a/tests/integration_tests/fail_over_ddl_mix/run.sh +++ b/tests/integration_tests/fail_over_ddl_mix/run.sh @@ -38,13 +38,27 @@ function prepare() { } function create_tables() { + ## normal tables for i in {1..5}; do echo "Creating table table_$i..." run_sql "CREATE TABLE IF NOT EXISTS test.table_$i (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));" ${UP_TIDB_HOST} ${UP_TIDB_PORT} done + + ## partition tables + for i in {6..10}; do + echo "Creating partition table_$i..." + run_sql "CREATE TABLE IF NOT EXISTS test.table_$i ( + id INT AUTO_INCREMENT PRIMARY KEY, + data VARCHAR(255) + ) + PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (200), + PARTITION p1 VALUES LESS THAN (1000), + PARTITION p2 VALUES LESS THAN MAXVALUE);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + done } -function execute_ddl() { +function execute_ddl_for_normal_tables() { while true; do table_num=$((RANDOM % 5 + 1)) table_name="table_$table_num" @@ -72,6 +86,51 @@ function execute_ddl() { done } +function execute_ddl_for_partition_tables() { + while true; do + table_num=$((RANDOM % 5 + 6)) + table_name="table_$table_num" + + case $((RANDOM % 4)) in + 0) + echo "DDL: Dropping And creating partition $table_name..." + run_sql "ALTER TABLE test.$table_name DROP PARTITION p2;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 0.5 + run_sql "ALTER TABLE test.$table_name ADD PARTITION (PARTITION p2 VALUES LESS THAN MAXVALUE);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + ;; + 1) + echo "DDL: Truncating partition $table_name..." + run_sql "ALTER TABLE test.$table_name TRUNCATE PARTITION p0;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE test.$table_name TRUNCATE PARTITION p1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE test.$table_name TRUNCATE PARTITION p2;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + ;; + 2) + echo "DDL: Removing and do partition $table_name..." + run_sql "ALTER TABLE test.$table_name REMOVE PARTITIONING;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 0.5 + run_sql "ALTER TABLE test.$table_name PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (200), + PARTITION p1 VALUES LESS THAN (500), + PARTITION p2 VALUES LESS THAN MAXVALUE);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + ;; + 3) + echo "DDL: Exchange Partition With normal Table $table_name..." + exchange_table=$((RANDOM % 5 + 1)) + exchange_table_name="table_$exchange_table" + run_sql_ignore_error "ALTER TABLE test.$table_name EXCHANGE PARTITION p0 with table test.$exchange_table_name" ${UP_TIDB_HOST} ${UP_TIDB_PORT} || true + echo "Exchange Partition Finished" + ;; + 4) + echo "DDL: REORGANIZE partition $table_name..." + run_sql "ALTER TABLE test.$table_name REORGANIZE PARTITION p0 INTO (PARTITION p00 VALUES LESS THAN (50), PARTITION p01 VALUES LESS THAN (100), PARTITION p02 VALUES LESS THAN (200))" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 0.5 + run_sql "ALTER TABLE test.$table_name REORGANIZE PARTITION p00, p01, p02 INTO (PARTITION p0 VALUES LESS THAN (200))" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + ;; + esac + sleep 1 + done +} + function execute_dml() { table_name="table_$1" echo "DML: Inserting data into $table_name..." @@ -112,8 +171,10 @@ main() { prepare create_tables - execute_ddl & - DDL_PID=$! + execute_ddl_for_normal_tables & + NORMAL_TABLE_DDL_PID=$! + execute_ddl_for_partition_tables & + PARTITION_TABLE_DDL_PID=$! # 启动 DML 线程 execute_dml 1 & @@ -126,12 +187,22 @@ main() { DML_PID_4=$! execute_dml 5 & DML_PID_5=$! + execute_dml 6 & + DML_PID_6=$! + execute_dml 7 & + DML_PID_7=$! + execute_dml 8 & + DML_PID_8=$! + execute_dml 9 & + DML_PID_9=$! + execute_dml 10 & + DML_PID_10=$! kill_server sleep 10 - kill -9 $DDL_PID $DML_PID_1 $DML_PID_2 $DML_PID_3 $DML_PID_4 $DML_PID_5 + kill -9 $NORMAL_TABLE_DDL_PID $PARTITION_TABLE_DDL_PID $DML_PID_1 $DML_PID_2 $DML_PID_3 $DML_PID_4 $DML_PID_5 $DML_PID_6 $DML_PID_7 $DML_PID_8 $DML_PID_9 $DML_PID_10 sleep 10 diff --git a/tests/integration_tests/partition_table/data/prepare.sql b/tests/integration_tests/partition_table/data/prepare.sql index bde99a305..0b660c32b 100644 --- a/tests/integration_tests/partition_table/data/prepare.sql +++ b/tests/integration_tests/partition_table/data/prepare.sql @@ -48,6 +48,7 @@ ALTER TABLE t1 REORGANIZE PARTITION p2,p3,p4 INTO (PARTITION p2 VALUES LESS THAN insert into t1 values (-3),(5),(14),(22),(30),(100); update t1 set a=a-16 where a=12; delete from t1 where a = 29; +ALTER TABLE t1 REORGANIZE PARTITION p2,p3,p4,pMax INTO (PARTITION pRest VALUES LESS THAN (MAXVALUE)); /* Change partitioning to key based and then back to range */ alter table t1 partition by key(a) partitions 7; From d5d2a937bf077ab3d278e2ec388b9b9dc0eb2d07 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 6 Feb 2025 11:19:05 +0800 Subject: [PATCH 2/6] update --- pkg/common/event/ddl_event.go | 9 +++++++++ pkg/common/event/interface.go | 1 + pkg/common/event/sync_point_event.go | 4 ++++ pkg/sink/mysql/mysql_writer_for_ddl_ts.go | 17 +++++++---------- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index f3dc27ba6..a4f43298a 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -72,6 +72,11 @@ type DDLEvent struct { TableNameChange *TableNameChange `json:"table_name_change"` TiDBOnly bool `json:"tidb_only"` + + // the tableID for the ddl job in the information_schema.ddl_jobs table(just ddl job.TableID) + // for the partition table, the TableIDInDDLJob is always the logical table id + // for truncate table, the TableIDInDDLJob is the table id of the old table + TableIDInDDLJob int64 `json:"table_id_in_ddl_job"` // Call when event flush is completed PostTxnFlushed []func() `json:"-"` // eventSize is the size of the event in bytes. It is set when it's unmarshaled. @@ -307,6 +312,10 @@ func (t *DDLEvent) IsPaused() bool { return t.State.IsPaused() } +func (t *DDLEvent) GetTableIDInDDLJob() int64 { + return t.TableIDInDDLJob +} + type SchemaTableName struct { SchemaName string TableName string diff --git a/pkg/common/event/interface.go b/pkg/common/event/interface.go index 74a7be353..f13ced8d5 100644 --- a/pkg/common/event/interface.go +++ b/pkg/common/event/interface.go @@ -48,6 +48,7 @@ type BlockEvent interface { GetNeedDroppedTables() *InfluencedTables GetNeedAddedTables() []Table GetUpdatedSchemas() []SchemaIDChange + GetTableIDInDDLJob() int64 } const ( diff --git a/pkg/common/event/sync_point_event.go b/pkg/common/event/sync_point_event.go index 4dcc80aee..bfd9411ee 100644 --- a/pkg/common/event/sync_point_event.go +++ b/pkg/common/event/sync_point_event.go @@ -102,3 +102,7 @@ func (e *SyncPointEvent) PushFrontFlushFunc(f func()) { func (e *SyncPointEvent) ClearPostFlushFunc() { e.PostTxnFlushed = e.PostTxnFlushed[:0] } + +func (e *SyncPointEvent) GetTableIDInDDLJob() int64 { + return 0 +} diff --git a/pkg/sink/mysql/mysql_writer_for_ddl_ts.go b/pkg/sink/mysql/mysql_writer_for_ddl_ts.go index e8ad9ae10..e5ad04a2a 100644 --- a/pkg/sink/mysql/mysql_writer_for_ddl_ts.go +++ b/pkg/sink/mysql/mysql_writer_for_ddl_ts.go @@ -92,12 +92,14 @@ func (w *MysqlWriter) SendDDLTsPre(event commonEvent.BlockEvent) error { tableIds = append(tableIds, table.TableID) } + TableIDInDDLJob := event.GetTableIDInDDLJob() + if len(tableIds) > 0 { isSyncpoint := "1" if event.GetType() == commonEvent.TypeDDLEvent { isSyncpoint = "0" } - query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint) + query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint, TableIDInDDLJob) log.Info("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) @@ -163,12 +165,14 @@ func (w *MysqlWriter) SendDDLTs(event commonEvent.BlockEvent) error { tableIds = append(tableIds, table.TableID) } + tableIDInDDLJob := event.GetTableIDInDDLJob() + if len(tableIds) > 0 { isSyncpoint := "1" if event.GetType() == commonEvent.TypeDDLEvent { isSyncpoint = "0" } - query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "1", isSyncpoint) + query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "1", isSyncpoint, tableIDInDDLJob) log.Info("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) @@ -204,14 +208,7 @@ func (w *MysqlWriter) SendDDLTs(event commonEvent.BlockEvent) error { return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table; Commit Fail;")) } -func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID string, ddlTs string, finished string, isSyncpoint string) string { - // choose one related table_id to help table trigger event dispatcher to find the ddl jobs. - relatedTableID := tableIds[0] // TODO: relatedID need to be selected carefully, especially for partitioned tables - if relatedTableID == 0 { - if len(tableIds) > 1 { - relatedTableID = tableIds[1] - } - } +func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID string, ddlTs string, finished string, isSyncpoint string, relatedTableID int64) string { var builder strings.Builder builder.WriteString("INSERT INTO ") builder.WriteString(filter.TiCDCSystemSchema) From 1448f2037f2d535090437770bbae01c9fee87d09 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 7 Feb 2025 18:59:48 +0800 Subject: [PATCH 3/6] support TableIDInDDLJob in schema store --- .../persist_storage_ddl_handlers.go | 10 ++-- logservice/schemastore/types.go | 7 ++- logservice/schemastore/types_gen.go | 49 ++++++++++++++----- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index c033fdbdf..cb419d1df 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -478,6 +478,7 @@ func buildPersistedDDLEventCommon(args buildPersistedDDLEventFuncArgs) Persisted event := PersistedDDLEvent{ ID: job.ID, Type: byte(job.Type), + TableIDInDDLJob: job.TableID, CurrentSchemaID: job.SchemaID, CurrentTableID: job.TableID, Query: query, @@ -1407,10 +1408,11 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, SchemaName: rawEvent.CurrentSchemaName, TableName: rawEvent.CurrentTableName, - Query: rawEvent.Query, - TableInfo: wrapTableInfo, - FinishedTs: rawEvent.FinishedTs, - TiDBOnly: tiDBOnly, + Query: rawEvent.Query, + TableInfo: wrapTableInfo, + FinishedTs: rawEvent.FinishedTs, + TiDBOnly: tiDBOnly, + TableIDInDDLJob: rawEvent.TableIDInDDLJob, }, !filtered } diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 42f5920a5..a3ac9bdd6 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -25,6 +25,11 @@ type PersistedDDLEvent struct { ID int64 `msg:"id"` Type byte `msg:"type"` + // the tableID for the ddl job in the information_schema.ddl_jobs table(just ddl job.TableID) + // for the partition table, the TableIDInDDLJob is always the logical table id + // for truncate table, the TableIDInDDLJob is the table id of the old table + TableIDInDDLJob int64 `msg:"table_id_in_ddl_job"` + // for exchange partition, it is the info of the partition table CurrentSchemaID int64 `msg:"current_schema_id"` CurrentTableID int64 `msg:"current_table_id"` @@ -43,7 +48,7 @@ type PersistedDDLEvent struct { PrevSchemaNames []string `msg:"prev_schema_names"` PrevTableNames []string `msg:"prev_table_names"` CurrentSchemaIDs []int64 `msg:"current_schema_ids"` - CurrentSchemaNames []string `msg:"s"` + CurrentSchemaNames []string `msg:"current_schema_names"` // The following fields are only set when the ddl job involves a partition table PrevPartitions []int64 `msg:"prev_partitions"` diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index 88d607847..83b9f2f3a 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -36,6 +36,12 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Type") return } + case "table_id_in_ddl_job": + z.TableIDInDDLJob, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "TableIDInDDLJob") + return + } case "current_schema_id": z.CurrentSchemaID, err = dc.ReadInt64() if err != nil { @@ -160,7 +166,7 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { return } } - case "s": + case "current_schema_names": var zb0006 uint32 zb0006, err = dc.ReadArrayHeader() if err != nil { @@ -272,9 +278,9 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 24 + // map header, size 25 // write "id" - err = en.Append(0xde, 0x0, 0x18, 0xa2, 0x69, 0x64) + err = en.Append(0xde, 0x0, 0x19, 0xa2, 0x69, 0x64) if err != nil { return } @@ -293,6 +299,16 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Type") return } + // write "table_id_in_ddl_job" + err = en.Append(0xb3, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x5f, 0x69, 0x6e, 0x5f, 0x64, 0x64, 0x6c, 0x5f, 0x6a, 0x6f, 0x62) + if err != nil { + return + } + err = en.WriteInt64(z.TableIDInDDLJob) + if err != nil { + err = msgp.WrapError(err, "TableIDInDDLJob") + return + } // write "current_schema_id" err = en.Append(0xb1, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) if err != nil { @@ -441,8 +457,8 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { return } } - // write "s" - err = en.Append(0xa1, 0x73) + // write "current_schema_names" + err = en.Append(0xb4, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) if err != nil { return } @@ -568,13 +584,16 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 24 + // map header, size 25 // string "id" - o = append(o, 0xde, 0x0, 0x18, 0xa2, 0x69, 0x64) + o = append(o, 0xde, 0x0, 0x19, 0xa2, 0x69, 0x64) o = msgp.AppendInt64(o, z.ID) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) o = msgp.AppendByte(o, z.Type) + // string "table_id_in_ddl_job" + o = append(o, 0xb3, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x5f, 0x69, 0x6e, 0x5f, 0x64, 0x64, 0x6c, 0x5f, 0x6a, 0x6f, 0x62) + o = msgp.AppendInt64(o, z.TableIDInDDLJob) // string "current_schema_id" o = append(o, 0xb1, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) o = msgp.AppendInt64(o, z.CurrentSchemaID) @@ -623,8 +642,8 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { for za0004 := range z.CurrentSchemaIDs { o = msgp.AppendInt64(o, z.CurrentSchemaIDs[za0004]) } - // string "s" - o = append(o, 0xa1, 0x73) + // string "current_schema_names" + o = append(o, 0xb4, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.CurrentSchemaNames))) for za0005 := range z.CurrentSchemaNames { o = msgp.AppendString(o, z.CurrentSchemaNames[za0005]) @@ -695,6 +714,12 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Type") return } + case "table_id_in_ddl_job": + z.TableIDInDDLJob, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "TableIDInDDLJob") + return + } case "current_schema_id": z.CurrentSchemaID, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { @@ -819,7 +844,7 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } - case "s": + case "current_schema_names": var zb0006 uint32 zb0006, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { @@ -932,7 +957,7 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *PersistedDDLEvent) Msgsize() (s int) { - s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 18 + msgp.Int64Size + 17 + msgp.Int64Size + 20 + msgp.StringPrefixSize + len(z.CurrentSchemaName) + 19 + msgp.StringPrefixSize + len(z.CurrentTableName) + 15 + msgp.Int64Size + 14 + msgp.Int64Size + 17 + msgp.StringPrefixSize + len(z.PrevSchemaName) + 16 + msgp.StringPrefixSize + len(z.PrevTableName) + 16 + msgp.ArrayHeaderSize + (len(z.PrevSchemaIDs) * (msgp.Int64Size)) + 18 + msgp.ArrayHeaderSize + s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 20 + msgp.Int64Size + 18 + msgp.Int64Size + 17 + msgp.Int64Size + 20 + msgp.StringPrefixSize + len(z.CurrentSchemaName) + 19 + msgp.StringPrefixSize + len(z.CurrentTableName) + 15 + msgp.Int64Size + 14 + msgp.Int64Size + 17 + msgp.StringPrefixSize + len(z.PrevSchemaName) + 16 + msgp.StringPrefixSize + len(z.PrevTableName) + 16 + msgp.ArrayHeaderSize + (len(z.PrevSchemaIDs) * (msgp.Int64Size)) + 18 + msgp.ArrayHeaderSize for za0002 := range z.PrevSchemaNames { s += msgp.StringPrefixSize + len(z.PrevSchemaNames[za0002]) } @@ -940,7 +965,7 @@ func (z *PersistedDDLEvent) Msgsize() (s int) { for za0003 := range z.PrevTableNames { s += msgp.StringPrefixSize + len(z.PrevTableNames[za0003]) } - s += 19 + msgp.ArrayHeaderSize + (len(z.CurrentSchemaIDs) * (msgp.Int64Size)) + 2 + msgp.ArrayHeaderSize + s += 19 + msgp.ArrayHeaderSize + (len(z.CurrentSchemaIDs) * (msgp.Int64Size)) + 21 + msgp.ArrayHeaderSize for za0005 := range z.CurrentSchemaNames { s += msgp.StringPrefixSize + len(z.CurrentSchemaNames[za0005]) } From 9fbd9f59fac78b2639b46f50e31a1abf0c1991a8 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 12 Feb 2025 10:12:01 +0800 Subject: [PATCH 4/6] update --- .github/workflows/integration_test_mysql.yaml | 45 ++++++++++++++++ pkg/sink/mysql/mysql_writer_for_ddl_ts.go | 51 ++++++++++--------- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 285f7c9e7..dbde0d0ce 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -546,6 +546,51 @@ jobs: make integration_test_build ls -l bin/ && ls -l tools/bin/ + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + + - name: Test fail_over_ddl_mix + run: | + pwd && ls -l bin/ && ls -l tools/bin/ + export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix + - name: Test fail_over run: | pwd && ls -l bin/ && ls -l tools/bin/ diff --git a/pkg/sink/mysql/mysql_writer_for_ddl_ts.go b/pkg/sink/mysql/mysql_writer_for_ddl_ts.go index e5ad04a2a..16a1e40ce 100644 --- a/pkg/sink/mysql/mysql_writer_for_ddl_ts.go +++ b/pkg/sink/mysql/mysql_writer_for_ddl_ts.go @@ -92,14 +92,14 @@ func (w *MysqlWriter) SendDDLTsPre(event commonEvent.BlockEvent) error { tableIds = append(tableIds, table.TableID) } - TableIDInDDLJob := event.GetTableIDInDDLJob() - if len(tableIds) > 0 { isSyncpoint := "1" + DDLQuery := "" if event.GetType() == commonEvent.TypeDDLEvent { isSyncpoint = "0" + DDLQuery = event.(*commonEvent.DDLEvent).Query } - query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint, TableIDInDDLJob) + query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint, DDLQuery) log.Info("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) @@ -165,14 +165,14 @@ func (w *MysqlWriter) SendDDLTs(event commonEvent.BlockEvent) error { tableIds = append(tableIds, table.TableID) } - tableIDInDDLJob := event.GetTableIDInDDLJob() - if len(tableIds) > 0 { isSyncpoint := "1" + ddlQuery := "" if event.GetType() == commonEvent.TypeDDLEvent { isSyncpoint = "0" + ddlQuery = event.(*commonEvent.DDLEvent).Query } - query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "1", isSyncpoint, tableIDInDDLJob) + query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "1", isSyncpoint, ddlQuery) log.Info("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) @@ -208,13 +208,13 @@ func (w *MysqlWriter) SendDDLTs(event commonEvent.BlockEvent) error { return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table; Commit Fail;")) } -func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID string, ddlTs string, finished string, isSyncpoint string, relatedTableID int64) string { +func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID string, ddlTs string, finished string, isSyncpoint string, query string) string { var builder strings.Builder builder.WriteString("INSERT INTO ") builder.WriteString(filter.TiCDCSystemSchema) builder.WriteString(".") builder.WriteString(filter.DDLTsTable) - builder.WriteString(" (ticdc_cluster_id, changefeed, ddl_ts, table_id, related_table_id, finished, is_syncpoint) VALUES ") + builder.WriteString(" (ticdc_cluster_id, changefeed, ddl_ts, table_id, query, finished, is_syncpoint) VALUES ") for idx, tableId := range tableIds { builder.WriteString("('") @@ -225,9 +225,9 @@ func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID strin builder.WriteString(ddlTs) builder.WriteString("', ") builder.WriteString(strconv.FormatInt(tableId, 10)) - builder.WriteString(", ") - builder.WriteString(strconv.FormatInt(relatedTableID, 10)) - builder.WriteString(", ") + builder.WriteString(", '") + builder.WriteString(query) + builder.WriteString("', ") builder.WriteString(finished) builder.WriteString(", ") builder.WriteString(isSyncpoint) @@ -236,7 +236,7 @@ func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID strin builder.WriteString(", ") } } - builder.WriteString(" ON DUPLICATE KEY UPDATE finished=VALUES(finished), related_table_id=VALUES(related_table_id), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);") + builder.WriteString(" ON DUPLICATE KEY UPDATE finished=VALUES(finished), query=VALUES(query), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);") return builder.String() } @@ -304,11 +304,12 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) } defer rows.Close() - var ddlTs, tableId, relatedTableId int64 + var ddlTs, tableId int64 + var ddlQuery string var finished, isSyncpoint bool var createdAtBytes []byte for rows.Next() { - err := rows.Scan(&tableId, &relatedTableId, &ddlTs, &finished, &createdAtBytes, &isSyncpoint) + err := rows.Scan(&tableId, &ddlQuery, &ddlTs, &finished, &createdAtBytes, &isSyncpoint) if err != nil { return retStartTsList, isSyncpoints, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to check ddl ts table; Query is %s", query))) } @@ -328,7 +329,7 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) } // query the ddl_jobs table to find whether the ddl is executed and the ddl created time - createdTime, ok := w.queryDDLJobs(relatedTableId) + createdTime, ok := w.queryDDLJobs(ddlQuery) if !ok { retStartTsList[tableIdIdxMap[tableId]] = ddlTs - 1 } else { @@ -336,12 +337,12 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) // show the ddl is executed retStartTsList[tableIdIdxMap[tableId]] = ddlTs isSyncpoints[tableIdIdxMap[tableId]] = isSyncpoint - log.Debug("createdTime is larger than createdAt", zap.Int64("tableId", tableId), zap.Int64("relatedTableId", relatedTableId), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs)) + log.Debug("createdTime is larger than createdAt", zap.Int64("tableId", tableId), zap.Any("ddlQuery", ddlQuery), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs)) continue } else { // show the ddl is not executed retStartTsList[tableIdIdxMap[tableId]] = ddlTs - 1 - log.Debug("createdTime is less than createdAt", zap.Int64("tableId", tableId), zap.Int64("relatedTableId", relatedTableId), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs-1)) + log.Debug("createdTime is less than createdAt", zap.Int64("tableId", tableId), zap.Any("ddlQuery", ddlQuery), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs-1)) continue } } @@ -353,7 +354,7 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) func selectDDLTsQuery(tableIDs []int64, ticdcClusterID string, changefeedID string) string { var builder strings.Builder - builder.WriteString("SELECT table_id, related_table_id, ddl_ts, finished, created_at, is_syncpoint FROM ") + builder.WriteString("SELECT table_id, query, ddl_ts, finished, created_at, is_syncpoint FROM ") builder.WriteString(filter.TiCDCSystemSchema) builder.WriteString(".") builder.WriteString(filter.DDLTsTable) @@ -375,11 +376,15 @@ func selectDDLTsQuery(tableIDs []int64, ticdcClusterID string, changefeedID stri return builder.String() } -var queryDDLJobs = `SELECT CREATE_TIME FROM information_schema.ddl_jobs WHERE TABLE_ID = "%s" order by CREATE_TIME desc limit 1;` +var queryDDLJobs = `SELECT CREATE_TIME FROM information_schema.ddl_jobs WHERE QUERY = "%s" order by CREATE_TIME desc limit 1;` -func (w *MysqlWriter) queryDDLJobs(tableID int64) (time.Time, bool) { +func (w *MysqlWriter) queryDDLJobs(ddlQuery string) (time.Time, bool) { + if ddlQuery == "" { + log.Info("ddlQuery is empty") + return time.Time{}, false + } // query the ddl_jobs table to find whether the ddl is executed - query := fmt.Sprintf(queryDDLJobs, strconv.FormatInt(tableID, 10)) + query := fmt.Sprintf(queryDDLJobs, ddlQuery) log.Info("query the info from ddl jobs", zap.String("query", query)) start := time.Now() @@ -406,7 +411,7 @@ func (w *MysqlWriter) queryDDLJobs(tableID int64) (time.Time, bool) { } return createdTime, true } - log.Debug("no ddl job item", zap.Int64("relatedTableId", tableID)) + log.Debug("no ddl job item", zap.Any("query", ddlQuery)) return time.Time{}, false } @@ -548,7 +553,7 @@ func (w *MysqlWriter) createDDLTsTable() error { ddl_ts varchar(18), table_id bigint(21), finished bool, - related_table_id bigint(21), + query varchar(1024), is_syncpoint bool, created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, INDEX (ticdc_cluster_id, changefeed, table_id), From 446157f0f8291cd38df1e1a0275960480d57df1f Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 12 Feb 2025 10:18:10 +0800 Subject: [PATCH 5/6] update --- pkg/common/event/ddl_event.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 3ff1ea87c..bd0ad4506 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -75,7 +75,6 @@ type DDLEvent struct { TableNameChange *TableNameChange `json:"table_name_change"` TiDBOnly bool `json:"tidb_only"` - // Call when event flush is completed PostTxnFlushed []func() `json:"-"` // eventSize is the size of the event in bytes. It is set when it's unmarshaled. From 907250e4c9e7d47815ba8890bdce98b23b305dd6 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 12 Feb 2025 16:41:03 +0800 Subject: [PATCH 6/6] update --- .../persist_storage_ddl_handlers.go | 27 +++++---- logservice/schemastore/types.go | 5 ++ logservice/schemastore/types_gen.go | 60 +++++++++++++++++-- pkg/common/event/ddl_event.go | 13 ++++ pkg/sink/mysql/mysql_writer_for_ddl_ts.go | 53 +++++++++------- 5 files changed, 119 insertions(+), 39 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index eee6bdd94..4823aad82 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -461,17 +461,19 @@ func buildPersistedDDLEventCommon(args buildPersistedDDLEventFuncArgs) Persisted // Note: if a ddl involve multiple tables, job.TableID is different with job.BinlogInfo.TableInfo.ID // and usually job.BinlogInfo.TableInfo.ID will be the newly created IDs. event := PersistedDDLEvent{ - ID: job.ID, - Type: byte(job.Type), - SchemaID: job.SchemaID, - TableID: job.TableID, - Query: query, - SchemaVersion: job.BinlogInfo.SchemaVersion, - DBInfo: job.BinlogInfo.DBInfo, - TableInfo: job.BinlogInfo.TableInfo, - FinishedTs: job.BinlogInfo.FinishedTS, - BDRRole: job.BDRRole, - CDCWriteSource: job.CDCWriteSource, + ID: job.ID, + Type: byte(job.Type), + TableNameInDDLJob: job.TableName, + DBNameInDDLJob: job.SchemaName, + SchemaID: job.SchemaID, + TableID: job.TableID, + Query: query, + SchemaVersion: job.BinlogInfo.SchemaVersion, + DBInfo: job.BinlogInfo.DBInfo, + TableInfo: job.BinlogInfo.TableInfo, + FinishedTs: job.BinlogInfo.FinishedTS, + BDRRole: job.BDRRole, + CDCWriteSource: job.CDCWriteSource, } return event } @@ -1414,6 +1416,9 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, TableInfo: wrapTableInfo, FinishedTs: rawEvent.FinishedTs, TiDBOnly: tiDBOnly, + + TableNameInDDLJob: rawEvent.TableNameInDDLJob, + DBNameInDDLJob: rawEvent.DBNameInDDLJob, }, !filtered } diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index b69dcb78b..aa17ceab7 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -25,6 +25,11 @@ type PersistedDDLEvent struct { ID int64 `msg:"id"` Type byte `msg:"type"` + // the table name for the ddl job in the information_schema.ddl_jobs table(just ddl job.TableName) + TableNameInDDLJob string `msg:"table_name_in_ddl_job"` + // the database name for the ddl job in the information_schema.ddl_jobs table(just ddl job.dbName) + DBNameInDDLJob string `msg:"db_name_in_ddl_job"` + // SchemaID is from upstream Job.SchemaID, it corresponds to TableID // it is the DB id of the table after the ddl SchemaID int64 `msg:"schema_id"` diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index 2018d8b4b..42790a9fc 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -36,6 +36,18 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Type") return } + case "table_name_in_ddl_job": + z.TableNameInDDLJob, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TableNameInDDLJob") + return + } + case "db_name_in_ddl_job": + z.DBNameInDDLJob, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "DBNameInDDLJob") + return + } case "schema_id": z.SchemaID, err = dc.ReadInt64() if err != nil { @@ -272,9 +284,9 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 24 + // map header, size 26 // write "id" - err = en.Append(0xde, 0x0, 0x18, 0xa2, 0x69, 0x64) + err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) if err != nil { return } @@ -293,6 +305,26 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Type") return } + // write "table_name_in_ddl_job" + err = en.Append(0xb5, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x64, 0x64, 0x6c, 0x5f, 0x6a, 0x6f, 0x62) + if err != nil { + return + } + err = en.WriteString(z.TableNameInDDLJob) + if err != nil { + err = msgp.WrapError(err, "TableNameInDDLJob") + return + } + // write "db_name_in_ddl_job" + err = en.Append(0xb2, 0x64, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x64, 0x64, 0x6c, 0x5f, 0x6a, 0x6f, 0x62) + if err != nil { + return + } + err = en.WriteString(z.DBNameInDDLJob) + if err != nil { + err = msgp.WrapError(err, "DBNameInDDLJob") + return + } // write "schema_id" err = en.Append(0xa9, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) if err != nil { @@ -568,13 +600,19 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 24 + // map header, size 26 // string "id" - o = append(o, 0xde, 0x0, 0x18, 0xa2, 0x69, 0x64) + o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) o = msgp.AppendInt64(o, z.ID) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) o = msgp.AppendByte(o, z.Type) + // string "table_name_in_ddl_job" + o = append(o, 0xb5, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x64, 0x64, 0x6c, 0x5f, 0x6a, 0x6f, 0x62) + o = msgp.AppendString(o, z.TableNameInDDLJob) + // string "db_name_in_ddl_job" + o = append(o, 0xb2, 0x64, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x64, 0x64, 0x6c, 0x5f, 0x6a, 0x6f, 0x62) + o = msgp.AppendString(o, z.DBNameInDDLJob) // string "schema_id" o = append(o, 0xa9, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) o = msgp.AppendInt64(o, z.SchemaID) @@ -695,6 +733,18 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Type") return } + case "table_name_in_ddl_job": + z.TableNameInDDLJob, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TableNameInDDLJob") + return + } + case "db_name_in_ddl_job": + z.DBNameInDDLJob, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DBNameInDDLJob") + return + } case "schema_id": z.SchemaID, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { @@ -932,7 +982,7 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *PersistedDDLEvent) Msgsize() (s int) { - s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 10 + msgp.Int64Size + 9 + msgp.Int64Size + 12 + msgp.StringPrefixSize + len(z.SchemaName) + 11 + msgp.StringPrefixSize + len(z.TableName) + 16 + msgp.Int64Size + 15 + msgp.Int64Size + 18 + msgp.StringPrefixSize + len(z.ExtraSchemaName) + 17 + msgp.StringPrefixSize + len(z.ExtraTableName) + 11 + msgp.ArrayHeaderSize + (len(z.SchemaIDs) * (msgp.Int64Size)) + 13 + msgp.ArrayHeaderSize + s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 22 + msgp.StringPrefixSize + len(z.TableNameInDDLJob) + 19 + msgp.StringPrefixSize + len(z.DBNameInDDLJob) + 10 + msgp.Int64Size + 9 + msgp.Int64Size + 12 + msgp.StringPrefixSize + len(z.SchemaName) + 11 + msgp.StringPrefixSize + len(z.TableName) + 16 + msgp.Int64Size + 15 + msgp.Int64Size + 18 + msgp.StringPrefixSize + len(z.ExtraSchemaName) + 17 + msgp.StringPrefixSize + len(z.ExtraTableName) + 11 + msgp.ArrayHeaderSize + (len(z.SchemaIDs) * (msgp.Int64Size)) + 13 + msgp.ArrayHeaderSize for za0002 := range z.SchemaNames { s += msgp.StringPrefixSize + len(z.SchemaNames[za0002]) } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index bd0ad4506..b124f7466 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -74,6 +74,11 @@ type DDLEvent struct { // Recover Table TableNameChange *TableNameChange `json:"table_name_change"` + // the table name for the ddl job in the information_schema.ddl_jobs table(just ddl job.TableName) + TableNameInDDLJob string `msg:"table_name_in_ddl_job"` + // the database name for the ddl job in the information_schema.ddl_jobs table(just ddl job.dbName) + DBNameInDDLJob string `msg:"db_name_in_ddl_job"` + TiDBOnly bool `json:"tidb_only"` // Call when event flush is completed PostTxnFlushed []func() `json:"-"` @@ -125,6 +130,14 @@ func (d *DDLEvent) GetExtraTableName() string { return d.ExtraTableName } +func (d *DDLEvent) GetTableNameInDDLJob() string { + return d.TableNameInDDLJob +} + +func (d *DDLEvent) GetDBNameInDDLJob() string { + return d.DBNameInDDLJob +} + func (d *DDLEvent) GetEvents() []*DDLEvent { // Some ddl event may be multi-events, we need to split it into multiple messages. // Such as rename table test.table1 to test.table10, test.table2 to test.table20 diff --git a/pkg/sink/mysql/mysql_writer_for_ddl_ts.go b/pkg/sink/mysql/mysql_writer_for_ddl_ts.go index 16a1e40ce..cdd6c4070 100644 --- a/pkg/sink/mysql/mysql_writer_for_ddl_ts.go +++ b/pkg/sink/mysql/mysql_writer_for_ddl_ts.go @@ -94,12 +94,14 @@ func (w *MysqlWriter) SendDDLTsPre(event commonEvent.BlockEvent) error { if len(tableIds) > 0 { isSyncpoint := "1" - DDLQuery := "" + tableNameInDDLJob := "" + dbNameInDDLJob := "" if event.GetType() == commonEvent.TypeDDLEvent { isSyncpoint = "0" - DDLQuery = event.(*commonEvent.DDLEvent).Query + tableNameInDDLJob = event.(*commonEvent.DDLEvent).GetTableNameInDDLJob() + dbNameInDDLJob = event.(*commonEvent.DDLEvent).GetDBNameInDDLJob() } - query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint, DDLQuery) + query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "0", isSyncpoint, tableNameInDDLJob, dbNameInDDLJob) log.Info("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) @@ -167,12 +169,14 @@ func (w *MysqlWriter) SendDDLTs(event commonEvent.BlockEvent) error { if len(tableIds) > 0 { isSyncpoint := "1" - ddlQuery := "" + tableNameInDDLJob := "" + dbNameInDDLJob := "" if event.GetType() == commonEvent.TypeDDLEvent { isSyncpoint = "0" - ddlQuery = event.(*commonEvent.DDLEvent).Query + tableNameInDDLJob = event.(*commonEvent.DDLEvent).GetTableNameInDDLJob() + dbNameInDDLJob = event.(*commonEvent.DDLEvent).GetDBNameInDDLJob() } - query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "1", isSyncpoint, ddlQuery) + query := insertItemQuery(tableIds, ticdcClusterID, changefeedID, ddlTs, "1", isSyncpoint, tableNameInDDLJob, dbNameInDDLJob) log.Info("send ddl ts table query", zap.String("query", query)) _, err = tx.Exec(query) @@ -208,13 +212,13 @@ func (w *MysqlWriter) SendDDLTs(event commonEvent.BlockEvent) error { return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table; Commit Fail;")) } -func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID string, ddlTs string, finished string, isSyncpoint string, query string) string { +func insertItemQuery(tableIds []int64, ticdcClusterID, changefeedID, ddlTs, finished, isSyncpoint, tableNameInDDLJob, dbNameInDDlJob string) string { var builder strings.Builder builder.WriteString("INSERT INTO ") builder.WriteString(filter.TiCDCSystemSchema) builder.WriteString(".") builder.WriteString(filter.DDLTsTable) - builder.WriteString(" (ticdc_cluster_id, changefeed, ddl_ts, table_id, query, finished, is_syncpoint) VALUES ") + builder.WriteString(" (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ") for idx, tableId := range tableIds { builder.WriteString("('") @@ -226,7 +230,9 @@ func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID strin builder.WriteString("', ") builder.WriteString(strconv.FormatInt(tableId, 10)) builder.WriteString(", '") - builder.WriteString(query) + builder.WriteString(tableNameInDDLJob) + builder.WriteString("', '") + builder.WriteString(dbNameInDDlJob) builder.WriteString("', ") builder.WriteString(finished) builder.WriteString(", ") @@ -236,7 +242,7 @@ func insertItemQuery(tableIds []int64, ticdcClusterID string, changefeedID strin builder.WriteString(", ") } } - builder.WriteString(" ON DUPLICATE KEY UPDATE finished=VALUES(finished), query=VALUES(query), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);") + builder.WriteString(" ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);") return builder.String() } @@ -305,11 +311,11 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) defer rows.Close() var ddlTs, tableId int64 - var ddlQuery string + var tableNameInDDLJob, dbNameInDDLJob string var finished, isSyncpoint bool var createdAtBytes []byte for rows.Next() { - err := rows.Scan(&tableId, &ddlQuery, &ddlTs, &finished, &createdAtBytes, &isSyncpoint) + err := rows.Scan(&tableId, &tableNameInDDLJob, &dbNameInDDLJob, &ddlTs, &finished, &createdAtBytes, &isSyncpoint) if err != nil { return retStartTsList, isSyncpoints, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to check ddl ts table; Query is %s", query))) } @@ -329,7 +335,7 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) } // query the ddl_jobs table to find whether the ddl is executed and the ddl created time - createdTime, ok := w.queryDDLJobs(ddlQuery) + createdTime, ok := w.queryDDLJobs(dbNameInDDLJob, tableNameInDDLJob) if !ok { retStartTsList[tableIdIdxMap[tableId]] = ddlTs - 1 } else { @@ -337,12 +343,12 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) // show the ddl is executed retStartTsList[tableIdIdxMap[tableId]] = ddlTs isSyncpoints[tableIdIdxMap[tableId]] = isSyncpoint - log.Debug("createdTime is larger than createdAt", zap.Int64("tableId", tableId), zap.Any("ddlQuery", ddlQuery), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs)) + log.Debug("createdTime is larger than createdAt", zap.Int64("tableId", tableId), zap.Any("tableNameInDDLJob", tableNameInDDLJob), zap.Any("dbNameInDDLJob", dbNameInDDLJob), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs)) continue } else { // show the ddl is not executed retStartTsList[tableIdIdxMap[tableId]] = ddlTs - 1 - log.Debug("createdTime is less than createdAt", zap.Int64("tableId", tableId), zap.Any("ddlQuery", ddlQuery), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs-1)) + log.Debug("createdTime is less than createdAt", zap.Int64("tableId", tableId), zap.Any("tableNameInDDLJob", tableNameInDDLJob), zap.Any("dbNameInDDLJob", dbNameInDDLJob), zap.Int64("ddlTs", ddlTs), zap.Int64("startTs", ddlTs-1)) continue } } @@ -354,7 +360,7 @@ func (w *MysqlWriter) GetStartTsList(tableIDs []int64) ([]int64, []bool, error) func selectDDLTsQuery(tableIDs []int64, ticdcClusterID string, changefeedID string) string { var builder strings.Builder - builder.WriteString("SELECT table_id, query, ddl_ts, finished, created_at, is_syncpoint FROM ") + builder.WriteString("SELECT table_id, table_name_in_ddl_job, db_name_in_ddl_job, ddl_ts, finished, created_at, is_syncpoint FROM ") builder.WriteString(filter.TiCDCSystemSchema) builder.WriteString(".") builder.WriteString(filter.DDLTsTable) @@ -376,15 +382,15 @@ func selectDDLTsQuery(tableIDs []int64, ticdcClusterID string, changefeedID stri return builder.String() } -var queryDDLJobs = `SELECT CREATE_TIME FROM information_schema.ddl_jobs WHERE QUERY = "%s" order by CREATE_TIME desc limit 1;` +var queryDDLJobs = `SELECT CREATE_TIME FROM information_schema.ddl_jobs WHERE DB_NAME = '%s' AND TABLE_NAME = '%s' order by CREATE_TIME desc limit 1;` -func (w *MysqlWriter) queryDDLJobs(ddlQuery string) (time.Time, bool) { - if ddlQuery == "" { - log.Info("ddlQuery is empty") +func (w *MysqlWriter) queryDDLJobs(dbNameInDDLJob, tableNameInDDLJob string) (time.Time, bool) { + if dbNameInDDLJob == "" && tableNameInDDLJob == "" { + log.Info("tableNameInDDLJob and dbNameInDDLJob both are nil") return time.Time{}, false } // query the ddl_jobs table to find whether the ddl is executed - query := fmt.Sprintf(queryDDLJobs, ddlQuery) + query := fmt.Sprintf(queryDDLJobs, dbNameInDDLJob, tableNameInDDLJob) log.Info("query the info from ddl jobs", zap.String("query", query)) start := time.Now() @@ -411,7 +417,7 @@ func (w *MysqlWriter) queryDDLJobs(ddlQuery string) (time.Time, bool) { } return createdTime, true } - log.Debug("no ddl job item", zap.Any("query", ddlQuery)) + log.Debug("no ddl job item", zap.Any("tableNameInDDLJob", tableNameInDDLJob), zap.Any("dbNameInDDLJob", dbNameInDDLJob)) return time.Time{}, false } @@ -553,7 +559,8 @@ func (w *MysqlWriter) createDDLTsTable() error { ddl_ts varchar(18), table_id bigint(21), finished bool, - query varchar(1024), + table_name_in_ddl_job varchar(1024), + db_name_in_ddl_job varchar(1024), is_syncpoint bool, created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, INDEX (ticdc_cluster_id, changefeed, table_id),