Skip to content

Commit

Permalink
addindex/disttask: adjust add index task concurrency & add check when…
Browse files Browse the repository at this point in the history
… submit task (pingcap#49403)

ref pingcap#49008
  • Loading branch information
D3Hunter authored Dec 13, 2023
1 parent ced8af2 commit 3379450
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/cpu",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/domainutil",
Expand Down
8 changes: 7 additions & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
Expand Down Expand Up @@ -2115,6 +2116,11 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
}

job := reorgInfo.Job
workerCntLimit := int(variable.GetDDLReorgWorkerCounter())
concurrency := min(workerCntLimit, cpu.GetCPUCount())
logutil.BgLogger().Info("adjusted add-index task concurrency",
zap.Int("worker-cnt", workerCntLimit), zap.Int("task-concurrency", concurrency),
zap.String("task-key", taskKey))
taskMeta := &BackfillTaskMeta{
Job: *reorgInfo.Job.Clone(),
EleIDs: elemIDs,
Expand All @@ -2129,7 +2135,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {

g.Go(func() error {
defer close(done)
err := handle.SubmitAndWaitTask(ctx, taskKey, taskType, distPhysicalTableConcurrency, metaData)
err := handle.SubmitAndWaitTask(ctx, taskKey, taskType, concurrency, metaData)
failpoint.Inject("pauseAfterDistTaskFinished", func() {
MockDMLExecutionOnTaskFinished()
})
Expand Down
32 changes: 15 additions & 17 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@ import (
// ┌────────┐
// ┌───────────│resuming│◄────────┐
// │ └────────┘ │
// │ ┌───────┐ ┌──┴───┐
// │ ┌────────►│pausing├──────►│paused│
// │ │ └───────┘ └──────┘
// ▼ │
// ┌──────┐ ┌───┴───┐ ┌────────┐
// ┌──────┐ │ ┌───────┐ ┌──┴───┐
// │failed│ │ ┌────────►│pausing├──────►│paused│
// └──────┘ │ │ └───────┘ └──────┘
// ▼ │
// ┌──────┐ ┌───┴───┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └───┬───┘ └────────┘
// ▼ │ ┌──────────┐
// ┌──────┐ ├────────►│cancelling│
// │failed│ │ └────┬─────┘
// └──────┘ │ ▼
// │ ┌─────────┐ ┌────────┐
// └────────►│reverting├────►│reverted│
// └────┬────┘ └────────┘
// │ ┌─────────────┐
// └─────────►│revert_failed│
// └─────────────┘
// └──┬────┘ └──┬┬───┘ └────────┘
// │ ││ ┌─────────┐ ┌────────┐
// │ │└────────►│reverting├────►│reverted│
// │ ▼ └────┬────┘ └────────┘
// │ ┌──────────┐ ▲ │ ┌─────────────┐
// └─────────►│cancelling├────┘ └─────────►│revert_failed│
// └──────────┘ └─────────────┘
// 1. succeed: pending -> running -> succeed
// 2. failed: pending -> running -> reverting -> reverted/revert_failed, pending -> failed
// 3. canceled: pending -> running -> cancelling -> reverting -> reverted/revert_failed
Expand Down Expand Up @@ -141,7 +137,9 @@ type Task struct {
Step Step
// Priority is the priority of task, the smaller value means the higher priority.
// valid range is [1, 1024], default is NormalPriority.
Priority int
Priority int
// Concurrency controls the max resource usage of the task, i.e. the max number
// of slots the task can use on each node.
Concurrency int
CreateTime time.Time

Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/util/chunk",
"//pkg/util/cpu",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func TestMain(m *testing.M) {
func TestTaskTable(t *testing.T) {
gm, ctx := testutil.InitTableTest(t)

_, err := gm.CreateTask(ctx, "key1", "test", 999, []byte("test"))
require.ErrorContains(t, err, "task concurrency(999) larger than cpu count")

timeBeforeCreate := time.Unix(time.Now().Unix(), 0)
id, err := gm.CreateTask(ctx, "key1", "test", 4, []byte("test"))
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
Expand Down Expand Up @@ -215,6 +216,14 @@ func (stm *TaskManager) CreateTask(ctx context.Context, key string, tp proto.Tas

// CreateTaskWithSession adds a new task to task table with session.
func (*TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error) {
cpuCount := cpu.GetCPUCount()
if concurrency > cpuCount {
// current resource control cannot schedule tasks with concurrency larger
// than cpu count
// TODO: if we are submitting a task on a node that is not managed by
// disttask framework, the checked cpu-count might not right.
return 0, errors.Errorf("task concurrency(%d) larger than cpu count(%d)", concurrency, cpuCount)
}
_, err = ExecSQL(ctx, se, `
insert into mysql.tidb_global_task(`+InsertTaskColumns+`)
values (%?, %?, %?, %?, %?, %?, %?, CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP())`,
Expand Down
3 changes: 3 additions & 0 deletions tests/realtikvtest/addindextest1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ go_test(
"//pkg/ddl/util/callback",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/sessionctx/variable",
"//pkg/store/helper",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/types",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
],
)
24 changes: 24 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package addindextest

import (
"context"
"fmt"
"testing"

"github.com/pingcap/failpoint"
Expand All @@ -23,15 +25,18 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
)

func init() {
Expand All @@ -41,6 +46,13 @@ func init() {
}

func TestAddIndexDistBasic(t *testing.T) {
// mock that we only have 1 cpu, add-index task can be scheduled as usual
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", `return(1)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/testSetLastTaskID", `return(true)`))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/testSetLastTaskID"))
})
store := realtikvtest.CreateMockStoreAndSetup(t)
if store.Name() != "TiKV" {
t.Skip("TiKV store only")
Expand All @@ -52,6 +64,9 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec("use test;")
tk.MustExec(`set global tidb_enable_dist_task=1;`)

bak := variable.GetDDLReorgWorkerCounter()
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 111")
require.Equal(t, int32(111), variable.GetDDLReorgWorkerCounter())
tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 20;")
tk.MustExec("insert into t values (), (), (), (), (), ()")
tk.MustExec("insert into t values (), (), (), (), (), ()")
Expand All @@ -61,6 +76,15 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;")
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check index t idx;")
taskMgr, err := storage.GetTaskManager()
require.NoError(t, err)
ctx := util.WithInternalSourceType(context.Background(), "dispatcher")
task, err := taskMgr.GetTaskByIDWithHistory(ctx, storage.TestLastTaskID.Load())
require.NoError(t, err)
require.Equal(t, 1, task.Concurrency)

tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_worker_cnt = %d", bak))
require.Equal(t, bak, variable.GetDDLReorgWorkerCounter())

tk.MustExec("create table t1(a bigint auto_random primary key);")
tk.MustExec("insert into t1 values (), (), (), (), (), ()")
Expand Down

0 comments on commit 3379450

Please sign in to comment.