Skip to content

Commit

Permalink
*: support metadata lock (#37393)
Browse files Browse the repository at this point in the history
ref #37275
  • Loading branch information
wjhuang2016 authored Sep 18, 2022
1 parent 181fb8e commit c4638b7
Show file tree
Hide file tree
Showing 70 changed files with 2,166 additions and 243 deletions.
2 changes: 1 addition & 1 deletion ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ go_library(

go_test(
name = "ddl_test",
timeout = "moderate",
timeout = "long",
srcs = [
"attributes_sql_test.go",
"callback_test.go",
Expand Down
1 change: 1 addition & 0 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,7 @@ func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON")
defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF")

Expand Down
23 changes: 23 additions & 0 deletions ddl/concurrentddltest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "concurrentddltest_test",
timeout = "short",
srcs = [
"main_test.go",
"switch_test.go",
],
flaky = True,
deps = [
"//config",
"//ddl",
"//kv",
"//meta",
"//testkit",
"//testkit/testsetup",
"//util",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
],
)
44 changes: 44 additions & 0 deletions ddl/concurrentddltest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrentddltest

import (
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()

config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

ddl.SetWaitTimeWhenErrorOccurred(time.Microsecond)

opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

goleak.VerifyTestMain(m, opts...)
}
139 changes: 139 additions & 0 deletions ddl/concurrentddltest/switch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrentddltest

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestConcurrentDDLSwitch(t *testing.T) {
store := testkit.CreateMockStore(t)

type table struct {
columnIdx int
indexIdx int
}

var tables []*table
tblCount := 20
for i := 0; i < tblCount; i++ {
tables = append(tables, &table{1, 0})
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt=1")
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size=32")

for i := range tables {
tk.MustExec(fmt.Sprintf("create table t%d (col0 int) partition by range columns (col0) ("+
"partition p1 values less than (100), "+
"partition p2 values less than (300), "+
"partition p3 values less than (500), "+
"partition p4 values less than (700), "+
"partition p5 values less than (1000), "+
"partition p6 values less than maxvalue);",
i))
for j := 0; j < 1000; j++ {
tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", i, j))
}
}

ddls := make([]string, 0, tblCount)
ddlCount := 100
for i := 0; i < ddlCount; i++ {
tblIdx := rand.Intn(tblCount)
if rand.Intn(2) == 0 {
ddls = append(ddls, fmt.Sprintf("alter table t%d add index idx%d (col0)", tblIdx, tables[tblIdx].indexIdx))
tables[tblIdx].indexIdx++
} else {
ddls = append(ddls, fmt.Sprintf("alter table t%d add column col%d int", tblIdx, tables[tblIdx].columnIdx))
tables[tblIdx].columnIdx++
}
}

c := atomic.NewInt32(0)
ch := make(chan struct{})
go func() {
var wg util.WaitGroupWrapper
for i := range ddls {
wg.Add(1)
go func(idx int) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(ddls[idx])
c.Add(1)
wg.Done()
}(i)
}
wg.Wait()
ch <- struct{}{}
}()

ticker := time.NewTicker(time.Second)
count := 0
done := false
for !done {
select {
case <-ch:
done = true
case <-ticker.C:
var b bool
var err error
err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error {
b, err = meta.NewMeta(txn).IsConcurrentDDL()
return err
})
require.NoError(t, err)
rs, err := testkit.NewTestKit(t, store).Exec(fmt.Sprintf("set @@global.tidb_enable_concurrent_ddl=%t", !b))
if rs != nil {
require.NoError(t, rs.Close())
}
if err == nil {
count++
if b {
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0"))
tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0"))
}
}
}
}

require.Equal(t, int32(ddlCount), c.Load())
require.Greater(t, count, 0)

tk = testkit.NewTestKit(t, store)
tk.MustExec("use test")
for i, tbl := range tables {
tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx)))
tk.MustExec(fmt.Sprintf("admin check table t%d", i))
for j := 0; j < tbl.indexIdx; j++ {
tk.MustExec(fmt.Sprintf("admin check index t%d idx%d", i, j))
}
}
}
2 changes: 2 additions & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
ReorgTableID = meta.MaxInt48 - 2
// HistoryTableID is the table ID of `tidb_ddl_history`.
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
Expand Down
15 changes: 2 additions & 13 deletions ddl/db_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -129,20 +127,10 @@ func TestIndexOnCacheTable(t *testing.T) {
}

func TestAlterTableCache(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
session.SetSchemaLease(600 * time.Millisecond)
session.DisableStats4Test()
dom, err := session.BootstrapSession(store)
require.NoError(t, err)
store, dom := testkit.CreateMockStoreAndDomain(t)

dom.SetStatsUpdating(true)

t.Cleanup(func() {
dom.Close()
err := store.Close()
require.NoError(t, err)
})
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)

Expand All @@ -157,6 +145,7 @@ func TestAlterTableCache(t *testing.T) {
checkTableCacheStatus(t, tk, "test", "t1", model.TableCacheStatusEnable)
tk.MustExec("alter table t1 nocache")
tk.MustExec("drop table if exists t1")
tk.MustExec("set global tidb_enable_metadata_lock=0")
/*Test can't skip schema checker*/
tk.MustExec("drop table if exists t1,t2")
tk.MustExec("CREATE TABLE t1 (a int)")
Expand Down
2 changes: 2 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3324,6 +3324,7 @@ func TestPartitionErrorCode(t *testing.T) {
// Reduce the impact on DML when executing partition DDL
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("set global tidb_enable_metadata_lock=0")
tk1.MustExec("drop table if exists t;")
tk1.MustExec(`create table t(id int primary key)
partition by hash(id) partitions 4;`)
Expand Down Expand Up @@ -3485,6 +3486,7 @@ func TestCommitWhenSchemaChange(t *testing.T) {
})
store := testkit.CreateMockStoreWithSchemaLease(t, time.Second)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set @@global.tidb_max_delta_schema_count= 4096")
tk.MustExec("use test")
tk.MustExec(`create table schema_change (a int, b timestamp)
Expand Down
9 changes: 5 additions & 4 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestAddNotNullColumnWhileInsertOnDupUpdate(t *testing.T) {
}

func TestTransactionOnAddDropColumn(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, time.Microsecond*500)
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_max_delta_schema_count= 4096")
tk.MustExec("use test")
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestAlterTableWithValidation(t *testing.T) {
}

func TestBatchCreateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, time.Microsecond*500)
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tables_1")
Expand Down Expand Up @@ -600,9 +600,10 @@ func TestWriteLocal(t *testing.T) {
}

func TestLockTables(t *testing.T) {
store := testkit.CreateMockStoreWithSchemaLease(t, time.Microsecond*500)
store := testkit.CreateMockStore(t)
setTxnTk := testkit.NewTestKit(t, store)
setTxnTk.MustExec("set global tidb_txn_mode=''")
setTxnTk.MustExec("set global tidb_enable_metadata_lock=0")
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1,t2")
Expand Down Expand Up @@ -834,7 +835,7 @@ func TestDDLWithInvalidTableInfo(t *testing.T) {
}

func TestAddColumn2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, time.Microsecond*500)
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
Expand Down
Loading

0 comments on commit c4638b7

Please sign in to comment.