From 7e90ede585d3bb1cd152c8eeff88957839f16bd4 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 26 Sep 2023 13:19:45 +0800 Subject: [PATCH] dm: sync partition DDL even if upstream schema is not equal in optimistic mode (#9789) (#9808) close pingcap/tiflow#9788 --- dm/master/shardddl/optimist_test.go | 2 +- dm/pkg/shardddl/optimism/lock.go | 7 ++++--- dm/pkg/shardddl/optimism/lock_test.go | 5 +++++ dm/tests/shardddl2/run.sh | 22 ++++++++++++++++++++++ 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/dm/master/shardddl/optimist_test.go b/dm/master/shardddl/optimist_test.go index f996b39144c..469f413e700 100644 --- a/dm/master/shardddl/optimist_test.go +++ b/dm/master/shardddl/optimist_test.go @@ -759,7 +759,7 @@ func (t *testOptimistSuite) TestOptimistLockConflict() { case <-time.After(watchTimeout): t.T().Fatal("timeout") case op3 := <-opCh: - require.Equal(t.T(), []string{}, op3.DDLs) + require.Equal(t.T(), DDLs1, op3.DDLs) require.Equal(t.T(), optimism.ConflictNone, op3.ConflictStage) } cancel2() diff --git a/dm/pkg/shardddl/optimism/lock.go b/dm/pkg/shardddl/optimism/lock.go index 2e148083e02..06f3df3413a 100644 --- a/dm/pkg/shardddl/optimism/lock.go +++ b/dm/pkg/shardddl/optimism/lock.go @@ -848,7 +848,7 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab // Normal DDL if tableErr == nil { - log.L().Debug("receive a normal DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable)) + log.L().Info("receive a normal DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable)) oldJoined, oldErr := l.joinNormalTables() l.tables[source][schema][table] = postTable @@ -883,7 +883,8 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab // oldJoined != newJoined // postTable == oldJoined (CREATE TABLE) // prevTable < postTable - return (joinedErr != nil || joinedCmp != 0) || (err2 == nil && cmp == 0) || tableCmp < 0, ConflictNone + // prevTable == postTable(Partition/Sequence) + return (joinedErr != nil || joinedCmp != 0) || (err2 == nil && cmp == 0) || tableCmp <= 0, ConflictNone } } @@ -920,7 +921,7 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab return true, ConflictNone } - log.L().Debug("conflict hasn't been resolved", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable)) + log.L().Info("conflict hasn't been resolved", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable)) return false, ConflictSkipWaitRedirect } diff --git a/dm/pkg/shardddl/optimism/lock_test.go b/dm/pkg/shardddl/optimism/lock_test.go index e4a50a69659..063b53b6e57 100644 --- a/dm/pkg/shardddl/optimism/lock_test.go +++ b/dm/pkg/shardddl/optimism/lock_test.go @@ -2738,6 +2738,11 @@ func (t *testLock) TestTrySyncForOneDDL(c *C) { c.Assert(schemaChanged, IsTrue) c.Assert(conflictStage, Equals, ConflictNone) + // check create partition, no changed since https://github.com/pingcap/tidb-tools/blob/d671b0840063bc2532941f02e02e12627402844c/pkg/schemacmp/table.go#L251 + schemaChanged, conflictStage = l.trySyncForOneDDL(source, schema, table1, t0, t1) + c.Assert(schemaChanged, IsTrue) + c.Assert(conflictStage, Equals, ConflictNone) + // check alter table drop column schemaChanged, conflictStage = l.trySyncForOneDDL(source, schema, table2, t0, t2) c.Assert(schemaChanged, IsFalse) diff --git a/dm/tests/shardddl2/run.sh b/dm/tests/shardddl2/run.sh index e744700e566..1be8dde17e0 100644 --- a/dm/tests/shardddl2/run.sh +++ b/dm/tests/shardddl2/run.sh @@ -507,6 +507,27 @@ function DM_DropAddColumn() { done } +function DM_ADD_DROP_PARTITIONS_CASE() { + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(2);" + run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,3);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(4);" + + run_sql_source1 "ALTER TABLE ${shardddl1}.${tb1} ADD PARTITION (partition p1 VALUES LESS THAN (10000))" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.partitions WHERE TABLE_SCHEMA='${shardddl}' AND TABLE_NAME = '${tb}' AND PARTITION_NAME IS NOT NULL;" "count(1): 2" + + run_sql_source1 "ALTER TABLE ${shardddl1}.${tb1} DROP PARTITION p1;" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.partitions WHERE TABLE_SCHEMA='${shardddl}' AND TABLE_NAME = '${tb}' AND PARTITION_NAME IS NOT NULL;" "count(1): 1" +} + +function DM_ADD_DROP_PARTITIONS() { + run_case ADD_DROP_PARTITIONS "double-source-optimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (100));\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb1} (a int) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (100));\";" \ + "clean_table" "optimistic" +} + function run() { init_cluster init_database @@ -515,6 +536,7 @@ function run() { DM_DROP_COLUMN_ALL_DONE DM_RECOVER_LOCK DM_DropAddColumn + DM_ADD_DROP_PARTITIONS start=36 end=45 except=(042 044 045)