Skip to content

Commit

Permalink
schema (ticdc): support exchange partition (#7118)
Browse files Browse the repository at this point in the history
ref #639, close #6322
  • Loading branch information
asddongmen authored Sep 21, 2022
1 parent 8a29142 commit ec49977
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 10 deletions.
109 changes: 103 additions & 6 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,18 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
case timodel.ActionTruncateTablePartition, timodel.ActionAddTablePartition, timodel.ActionDropTablePartition:
case timodel.ActionTruncateTablePartition,
timodel.ActionAddTablePartition,
timodel.ActionDropTablePartition:
err := s.inner.updatePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
case timodel.ActionExchangeTablePartition:
err := s.inner.exchangePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -706,7 +713,7 @@ func (s *snapshot) dropSchema(id int64, currentTs uint64) error {
return nil
}

// Create a new schema in the snapshot. `dbInfo` will be deep copied.
// Create a new schema in the snapshot. `dbInfo` will be deeply copied.
func (s *snapshot) createSchema(dbInfo *timodel.DBInfo, currentTs uint64) error {
x, ok := s.schemaByID(dbInfo.ID)
if ok {
Expand All @@ -721,7 +728,7 @@ func (s *snapshot) createSchema(dbInfo *timodel.DBInfo, currentTs uint64) error
return nil
}

// Replace a schema. dbInfo will be deep copied.
// Replace a schema. dbInfo will be deeply copied.
// Callers should ensure `dbInfo` information not conflict with other schemas.
func (s *snapshot) replaceSchema(dbInfo *timodel.DBInfo, currentTs uint64) error {
old, ok := s.schemaByID(dbInfo.ID)
Expand Down Expand Up @@ -794,7 +801,7 @@ func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs ui
return
}

// Create a new table in the snapshot. `tbInfo` will be deep copied.
// Create a new table in the snapshot. `tbInfo` will be deeply copied.
func (s *snapshot) createTable(tbInfo *model.TableInfo, currentTs uint64) error {
if _, ok := s.schemaByID(tbInfo.SchemaID); !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", tbInfo.SchemaID)
Expand Down Expand Up @@ -892,10 +899,100 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er
}
}
s.currentTs = currentTs
// TODO: is it necessary to print changes detailly?

log.Debug("adjust partition success",
zap.String("schema", tbInfo.TableName.Schema),
zap.String("table", tbInfo.TableName.Table))
zap.String("table", tbInfo.TableName.Table),
zap.Any("partitions", newPi.Definitions),
)
return nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
oldTable, ok := s.physicalTableByID(targetTable.ID)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
}

oldPartitions := oldTable.GetPartitionInfo()
if oldPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", oldTable.ID)
}

newPartitions := targetTable.GetPartitionInfo()
if newPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", targetTable.ID)
}

oldIDs := make(map[int64]struct{}, len(oldPartitions.Definitions))
for _, p := range oldPartitions.Definitions {
oldIDs[p.ID] = struct{}{}
}

newIDs := make(map[int64]struct{}, len(oldPartitions.Definitions))
for _, p := range newPartitions.Definitions {
newIDs[p.ID] = struct{}{}
}

// 1. find the source table info
var diff []int64
for id := range newIDs {
if _, ok := oldIDs[id]; !ok {
diff = append(diff, id)
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}
sourceTable, ok = s.physicalTableByID(diff[0])
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
}

// 3.find the exchanged partition info
diff = diff[:0]
for id := range oldIDs {
if _, ok := newIDs[id]; !ok {
diff = append(diff, id)
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}

exchangedPartitionID := diff[0]
// 4.update the targetTable
err := s.updatePartition(targetTable, currentTS)
if err != nil {
return errors.Trace(err)
}

newSourceTable := sourceTable.Clone()
// 5.update the sourceTable
err = s.dropTable(sourceTable.ID, currentTS)
if err != nil {
return errors.Trace(err)
}
newSourceTable.ID = exchangedPartitionID
err = s.createTable(newSourceTable, currentTS)
if err != nil {
return errors.Trace(err)
}

log.Info("handle exchange partition success",
zap.String("sourceTable", sourceTable.TableName.String()),
zap.Int64("exchangedPartition", exchangedPartitionID),
zap.String("targetTable", targetTable.TableName.String()),
zap.Any("partition", targetTable.GetPartitionInfo().Definitions))
return nil
}

Expand Down
48 changes: 48 additions & 0 deletions cdc/entry/schema/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,54 @@ func TestUpdatePartition(t *testing.T) {
require.True(t, snap2.IsIneligibleTableID(11+65536*2))
}

func TestExchangePartition(t *testing.T) {
var targetTb, sourceTb *model.TableInfo

snap := NewEmptySnapshot(false)
require.Nil(t, snap.inner.createSchema(newDBInfo(1), 100))

// exchange partition fails if the target table is not a partitioned table.
targetTbID := int64(11)
targetTb = newTbInfo(1, "DB_1", targetTbID)
targetTb.Partition = nil
require.Nil(t, snap.inner.createTable(targetTb, 110))
require.Error(t, snap.inner.exchangePartition(newTbInfo(1, "DB_1", 11), 120))
require.Nil(t, snap.inner.dropTable(targetTbID, 125))

// prepare the target table.
targetTb = newTbInfo(1, "DB_1", targetTbID)
p1ID := int64(11 + 65536*1)
p2ID := int64(11 + 65536*2)
targetTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: p1ID}
targetTb.Partition.Definitions = append(targetTb.Partition.Definitions, timodel.PartitionDefinition{ID: p2ID})
// update the target table to a partition table.
require.Nil(t, snap.inner.createTable(targetTb, 140))

// create source table.
sourceTbID := int64(12)
sourceTb = newTbInfo(1, "DB_1", sourceTbID)
require.Nil(t, snap.inner.createTable(sourceTb, 150))

// exchange partition, p1ID should be exchange by sourceTbID.
exchangedTargetTb := newTbInfo(1, "DB_1", targetTbID)
exchangedTargetTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: sourceTbID}
exchangedTargetTb.Partition.Definitions = append(exchangedTargetTb.Partition.Definitions, timodel.PartitionDefinition{ID: p2ID})
require.Nil(t, snap.inner.exchangePartition(exchangedTargetTb, 160))

// make sure we can use the exchanged source table's id to get the target table info,
// since the source table has become a partition of target table.
tarInfo1, _ := snap.PhysicalTableByID(targetTbID)
require.Equal(t, tarInfo1.Partition.Definitions[0].ID, sourceTbID)
tarInfo2, ok := snap.PhysicalTableByID(sourceTbID)
require.True(t, ok)
require.Equal(t, tarInfo1.Name, tarInfo2.Name)

// make sure we can use the exchanged partition's id to get the source table info.
exchangedSourceTb, ok := snap.PhysicalTableByID(p1ID)
require.True(t, ok)
require.Equal(t, sourceTb.Name, exchangedSourceTb.Name)
}

func TestDrop(t *testing.T) {
snap := NewEmptySnapshot(false)

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID {
if s.allPhysicalTablesCache != nil {
return s.allPhysicalTablesCache
}
// NOTE: it's better to pre-allocate the vector. However in the current implementation
// NOTE: it's better to pre-allocate the vector. However, in the current implementation
// we can't know how many valid tables in the snapshot.
s.allPhysicalTablesCache = make([]model.TableID, 0)
s.schemaSnapshot.IterTables(true, func(tblInfo *model.TableInfo) {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ error = '''
eventfeed returns event error
'''

["CDC:ErrExchangePartition"]
error = '''
exchange partition failed, %s
'''

["CDC:ErrExecDDLFailed"]
error = '''
exec DDL failed
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ var (
"invalid ddl job(%d)",
errors.RFCCodeText("CDC:ErrInvalidDDLJob"),
)
ErrExchangePartition = errors.Normalize(
"exchange partition failed, %s",
errors.RFCCodeText("CDC:ErrExchangePartition"),
)

// puller related errors
ErrBufferReachLimit = errors.Normalize(
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ EOF
run_cdc_cli unsafe reset --no-confirm --pd=$pd_addr
REGION_ID=$(pd-ctl -u=$pd_addr region | jq '.regions[0].id')
TS=$(cdc cli tso query --pd=$pd_addr)
# wait for owner online
sleep 3
run_cdc_cli unsafe resolve-lock --region=$REGION_ID
run_cdc_cli unsafe resolve-lock --region=$REGION_ID --ts=$TS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ check-struct-only = false

target-instance = "tidb0"

target-check-tables = ["consistent_replicate_s3.usertable*"]
target-check-tables = ["consistent_replicate_s3.usertable*","consistent_replicate_s3.t*"]

[data-sources]
[data-sources.mysql1]
Expand Down
13 changes: 13 additions & 0 deletions tests/integration_tests/consistent_replicate_s3/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use `consistent_replicate_s3`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/

create table t2 (a int primary key);


9 changes: 8 additions & 1 deletion tests/integration_tests/consistent_replicate_s3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function run() {
start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

Expand All @@ -78,8 +79,11 @@ function run() {
run_sql "CREATE DATABASE consistent_replicate_s3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_s3
run_sql "CREATE table consistent_replicate_s3.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "consistent_replicate_s3.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "consistent_replicate_s3.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_table_exists "consistent_replicate_s3.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
Expand All @@ -88,10 +92,13 @@ function run() {
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sinkv2/eventsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_s3.USERTABLE2 like consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_s3.USERTABLE2 select * from consistent_replicate_s3.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# to ensure row changed events have been replicated to TiCDC
sleep 10
sleep 20

current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
ensure 50 check_resolved_ts $changefeed_id $current_tso $WORK_DIR/redo/meta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ function run() {
run_sql "CREATE table consistent_replicate_s3.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "consistent_replicate_s3.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "consistent_replicate_s3.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
Expand Down
4 changes: 4 additions & 0 deletions tests/integration_tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`'
"alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false 'ALTER TABLE `ddl_reentrant`.`t3` ADD PARTITION (PARTITION `p2` VALUES LESS THAN (3000))'
"alter table ddl_reentrant.t3 drop partition p2" false 'ALTER TABLE `ddl_reentrant`.`t3` DROP PARTITION `p2`'
"alter table ddl_reentrant.t3 truncate partition p0" true 'ALTER TABLE `ddl_reentrant`.`t3` TRUNCATE PARTITION `p0`'
"create table ddl_reentrant.t4 (a int primary key, b int)" false 'CREATE TABLE `ddl_reentrant`.`t4` (`a` INT PRIMARY KEY,`b` INT)'
"alter table ddl_reentrant.t3 exchange partition p0 with table ddl_reentrant.t4" true 'ALTER TABLE `ddl_reentrant`.`t3` EXCHANGE PARTITION `p0` WITH TABLE `ddl_reentrant`.`t4`'
"create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false 'CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `ddl_reentrant`.`t3_view` AS SELECT `a`,`b` FROM `ddl_reentrant`.`t3`'
"drop view ddl_reentrant.t3_view" false 'DROP VIEW `ddl_reentrant`.`t3_view`'
"alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER TABLE `ddl_reentrant`.`t3` CHARACTER SET UTF8MB4 COLLATE UTF8MB4_UNICODE_CI'
Expand Down Expand Up @@ -130,8 +132,10 @@ function run() {
if [[ $tidb_build_branch =~ master ]]; then
# https://github.com/pingcap/tidb/pull/21533 disables multi_schema change
# feature by default, turn it on first
run_sql "set @@global.tidb_enable_exchange_partition=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# This must be set before cdc server starts
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
# TiDB global variables cache 2 seconds at most
sleep 2
Expand Down
6 changes: 5 additions & 1 deletion tests/integration_tests/partition_table/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
drop database if exists `partition_table`;
set @@global.tidb_enable_exchange_partition=on;
create database `partition_table`;
use `partition_table`;

Expand All @@ -14,12 +15,15 @@ insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3*/
insert into t1 values (25),(29),(35); /*these values in p3,p4*/
alter table t1 truncate partition p0;
alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

create table finish_mark (a int primary key);
2 changes: 2 additions & 0 deletions tests/integration_tests/partition_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function run() {

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

Expand All @@ -34,6 +35,7 @@ function run() {
check_table_exists partition_table.t ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.t2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

Expand Down

0 comments on commit ec49977

Please sign in to comment.