Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema (ticdc): support exchange partition #7118

Merged
merged 20 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 91 additions & 3 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,18 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
case timodel.ActionTruncateTablePartition, timodel.ActionAddTablePartition, timodel.ActionDropTablePartition:
case timodel.ActionTruncateTablePartition,
timodel.ActionAddTablePartition,
timodel.ActionDropTablePartition:
err := s.inner.updatePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
case timodel.ActionExchangeTablePartition:
err := s.inner.exchangePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -892,10 +899,91 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er
}
}
s.currentTs = currentTs
// TODO: is it necessary to print changes detailly?

log.Debug("adjust partition success",
zap.String("schema", tbInfo.TableName.Schema),
zap.String("table", tbInfo.TableName.Table))
zap.String("table", tbInfo.TableName.Table),
zap.Any("partitions", newPi.Definitions),
)
return nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
var sourceTable *model.TableInfo
oldTable, ok := s.physicalTableByID(targetTable.ID)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
}

oldPartitions := oldTable.GetPartitionInfo()
if oldPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", oldTable.ID)
}

newPartitions := targetTable.GetPartitionInfo()
if newPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", targetTable.ID)
}

oldIDs := make(map[int64]struct{}, len(oldPartitions.Definitions))
for _, p := range oldPartitions.Definitions {
oldIDs[p.ID] = struct{}{}
}

newIDs := make(map[int64]struct{}, len(oldPartitions.Definitions))
for _, p := range newPartitions.Definitions {
newIDs[p.ID] = struct{}{}
}

// 1. find the source table info
var diff []int64
for id := range newIDs {
if _, ok := oldIDs[id]; !ok {
diff = append(diff, id)
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}
sourceTable, ok = s.physicalTableByID(diff[0])
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
}

// 3.find the exchanged partition info
diff = diff[:0]
for id := range oldIDs {
if _, ok := newIDs[id]; !ok {
diff = append(diff, id)
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}

exchangedPartitionID := diff[0]
// 4.update the targetTable
s.updatePartition(targetTable, currentTS)

newSourceTable := sourceTable.Clone()
// 5.update the sourceTable
s.dropTable(sourceTable.ID, currentTS)
newSourceTable.ID = exchangedPartitionID
s.createTable(newSourceTable, currentTS)

log.Info("handle exchange partition success",
zap.String("sourceTable", sourceTable.TableName.String()),
zap.Int64("exchangedPartition", exchangedPartitionID),
zap.String("targetTable", targetTable.TableName.String()),
zap.Any("partition", targetTable.GetPartitionInfo().Definitions))
return nil
}

Expand Down
48 changes: 48 additions & 0 deletions cdc/entry/schema/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,54 @@ func TestUpdatePartition(t *testing.T) {
require.True(t, snap2.IsIneligibleTableID(11+65536*2))
}

func TestExchangePartition(t *testing.T) {
var targetTb, sourceTb *model.TableInfo

snap := NewEmptySnapshot(false)
require.Nil(t, snap.inner.createSchema(newDBInfo(1), 100))

// exchange partition fails if the target table is not a partitioned table.
targetTbID := int64(11)
targetTb = newTbInfo(1, "DB_1", targetTbID)
targetTb.Partition = nil
require.Nil(t, snap.inner.createTable(targetTb, 110))
require.Error(t, snap.inner.exchangePartition(newTbInfo(1, "DB_1", 11), 120))
require.Nil(t, snap.inner.dropTable(targetTbID, 125))

// prepare the target table.
targetTb = newTbInfo(1, "DB_1", targetTbID)
p1ID := int64(11 + 65536*1)
p2ID := int64(11 + 65536*2)
targetTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: p1ID}
targetTb.Partition.Definitions = append(targetTb.Partition.Definitions, timodel.PartitionDefinition{ID: p2ID})
// update the target table to a partition table.
require.Nil(t, snap.inner.createTable(targetTb, 140))

// create source table.
sourceTbID := int64(12)
sourceTb = newTbInfo(1, "DB_1", sourceTbID)
require.Nil(t, snap.inner.createTable(sourceTb, 150))

// exchange partition, p1ID should be exchange by sourceTbID.
exchangedTargetTb := newTbInfo(1, "DB_1", targetTbID)
exchangedTargetTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: sourceTbID}
exchangedTargetTb.Partition.Definitions = append(exchangedTargetTb.Partition.Definitions, timodel.PartitionDefinition{ID: p2ID})
require.Nil(t, snap.inner.exchangePartition(exchangedTargetTb, 160))

// make sure we can use the exchanged source table's id to get the target table info,
// since the source table has become a partition of target table.
tarInfo1, _ := snap.PhysicalTableByID(targetTbID)
require.Equal(t, tarInfo1.Partition.Definitions[0].ID, sourceTbID)
tarInfo2, ok := snap.PhysicalTableByID(sourceTbID)
require.True(t, ok)
require.Equal(t, tarInfo1.Name, tarInfo2.Name)

// make sure we can use the exchanged partition's id to get the source table info.
exchangedSourceTb, ok := snap.PhysicalTableByID(p1ID)
require.True(t, ok)
require.Equal(t, sourceTb.Name, exchangedSourceTb.Name)
}

func TestDrop(t *testing.T) {
snap := NewEmptySnapshot(false)

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID {
if s.allPhysicalTablesCache != nil {
return s.allPhysicalTablesCache
}
// NOTE: it's better to pre-allocate the vector. However in the current implementation
// NOTE: it's better to pre-allocate the vector. However, in the current implementation
// we can't know how many valid tables in the snapshot.
s.allPhysicalTablesCache = make([]model.TableID, 0)
s.schemaSnapshot.IterTables(true, func(tblInfo *model.TableInfo) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ var (
"invalid ddl job(%d)",
errors.RFCCodeText("CDC:ErrInvalidDDLJob"),
)
ErrExchangePartition = errors.Normalize(
"exchange partition failed, %s",
errors.RFCCodeText("CDC:ErrExchangePartition"),
)

// puller related errors
ErrBufferReachLimit = errors.Normalize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use `consistent_replicate_s3`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/

create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

create table finish_mark (a int primary key);
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ function run() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_s3.USERTABLE2 like consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.USERTABLE2 select * from consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# to ensure row changed events have been replicated to TiCDC
sleep 10
sleep 15

current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 50 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta
Expand Down
4 changes: 4 additions & 0 deletions tests/integration_tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`'
"alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false 'ALTER TABLE `ddl_reentrant`.`t3` ADD PARTITION (PARTITION `p2` VALUES LESS THAN (3000))'
"alter table ddl_reentrant.t3 drop partition p2" false 'ALTER TABLE `ddl_reentrant`.`t3` DROP PARTITION `p2`'
"alter table ddl_reentrant.t3 truncate partition p0" true 'ALTER TABLE `ddl_reentrant`.`t3` TRUNCATE PARTITION `p0`'
"create table ddl_reentrant.t4 (a int primary key, b int)" false 'CREATE TABLE `ddl_reentrant`.`t4` (`a` INT PRIMARY KEY,`b` INT)'
"alter table ddl_reentrant.t3 exchange partition p0 with table ddl_reentrant.t4" true 'ALTER TABLE `ddl_reentrant`.`t3` EXCHANGE PARTITION `p0` WITH TABLE `ddl_reentrant`.`t4`'
"create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false 'CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `ddl_reentrant`.`t3_view` AS SELECT `a`,`b` FROM `ddl_reentrant`.`t3`'
"drop view ddl_reentrant.t3_view" false 'DROP VIEW `ddl_reentrant`.`t3_view`'
"alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER TABLE `ddl_reentrant`.`t3` CHARACTER SET UTF8MB4 COLLATE UTF8MB4_UNICODE_CI'
Expand Down Expand Up @@ -130,8 +132,10 @@ function run() {
if [[ $tidb_build_branch =~ master ]]; then
# https://github.com/pingcap/tidb/pull/21533 disables multi_schema change
# feature by default, turn it on first
run_sql "set @@global.tidb_enable_exchange_partition=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# This must be set before cdc server starts
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
# TiDB global variables cache 2 seconds at most
sleep 2
Expand Down
6 changes: 5 additions & 1 deletion tests/integration_tests/partition_table/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
drop database if exists `partition_table`;
set @@global.tidb_enable_exchange_partition=on;
create database `partition_table`;
use `partition_table`;

Expand All @@ -14,12 +15,15 @@ insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3*/
insert into t1 values (25),(29),(35); /*these values in p3,p4*/
alter table t1 truncate partition p0;
alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

create table finish_mark (a int primary key);
2 changes: 2 additions & 0 deletions tests/integration_tests/partition_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function run() {

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

Expand All @@ -34,6 +35,7 @@ function run() {
check_table_exists partition_table.t ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.t2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

Expand Down