diff --git a/pkg/disttask/framework/storage/BUILD.bazel b/pkg/disttask/framework/storage/BUILD.bazel index cda18d01cee1d..81f02d6884ae9 100644 --- a/pkg/disttask/framework/storage/BUILD.bazel +++ b/pkg/disttask/framework/storage/BUILD.bazel @@ -38,10 +38,8 @@ go_test( "//pkg/disttask/framework/proto", "//pkg/disttask/framework/testutil", "//pkg/sessionctx", - "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/util/sqlexec", - "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 0dca7f2801953..bb451a87d432b 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -20,14 +20,12 @@ import ( "testing" "time" - "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/testutil" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testsetup" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" @@ -45,30 +43,8 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, opts...) } -func GetResourcePool(t *testing.T) *pools.ResourcePool { - store := testkit.CreateMockStore(t) - - tk := testkit.NewTestKit(t, store) - pool := pools.NewResourcePool(func() (pools.Resource, error) { - return tk.Session(), nil - }, 1, 1, time.Second) - return pool -} - -func GetTaskManager(t *testing.T, pool *pools.ResourcePool) *storage.TaskManager { - manager := storage.NewTaskManager(pool) - storage.SetTaskManager(manager) - manager, err := storage.GetTaskManager() - require.NoError(t, err) - return manager -} - func TestTaskTable(t *testing.T) { - pool := GetResourcePool(t) - gm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + gm, ctx := testutil.InitTableTest(t) timeBeforeCreate := time.Unix(time.Now().Unix(), 0) id, err := gm.CreateTask(ctx, "key1", "test", 4, []byte("test")) @@ -152,11 +128,7 @@ func TestTaskTable(t *testing.T) { } func TestGetTopUnfinishedTasks(t *testing.T) { - pool := GetResourcePool(t) - gm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + gm, ctx := testutil.InitTableTest(t) taskStates := []proto.TaskState{ proto.TaskStateSucceed, @@ -221,11 +193,7 @@ func TestGetTopUnfinishedTasks(t *testing.T) { } func TestGetUsedSlotsOnNodes(t *testing.T) { - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + sm, ctx := testutil.InitTableTest(t) testutil.InsertSubtask(t, sm, 1, proto.StepOne, "tidb-1", []byte(""), proto.TaskStateRunning, "test", 12) testutil.InsertSubtask(t, sm, 1, proto.StepOne, "tidb-2", []byte(""), proto.TaskStatePending, "test", 12) @@ -242,12 +210,7 @@ func TestGetUsedSlotsOnNodes(t *testing.T) { } func TestSubTaskTable(t *testing.T) { - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") - + sm, ctx := testutil.InitTableTest(t) timeBeforeCreate := time.Unix(time.Now().Unix(), 0) id, err := sm.CreateTask(ctx, "key1", "test", 4, []byte("test")) require.NoError(t, err) @@ -471,12 +434,7 @@ func TestSubTaskTable(t *testing.T) { } func TestBothTaskAndSubTaskTable(t *testing.T) { - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") - + sm, ctx := testutil.InitTableTest(t) id, err := sm.CreateTask(ctx, "key1", "test", 4, []byte("test")) require.NoError(t, err) require.Equal(t, int64(1), id) @@ -594,11 +552,8 @@ func TestDistFrameworkMeta(t *testing.T) { defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask")) }() - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + sm, ctx := testutil.InitTableTest(t) + require.NoError(t, sm.StartManager(ctx, ":4000", "background")) require.NoError(t, sm.StartManager(ctx, ":4001", "")) require.NoError(t, sm.StartManager(ctx, ":4002", "background")) @@ -627,11 +582,7 @@ func TestDistFrameworkMeta(t *testing.T) { } func TestSubtaskHistoryTable(t *testing.T) { - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + sm, ctx := testutil.InitTableTest(t) const ( taskID = 1 @@ -696,11 +647,7 @@ func TestSubtaskHistoryTable(t *testing.T) { } func TestTaskHistoryTable(t *testing.T) { - pool := GetResourcePool(t) - gm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + gm, ctx := testutil.InitTableTest(t) _, err := gm.CreateTask(ctx, "1", proto.TaskTypeExample, 1, nil) require.NoError(t, err) @@ -742,11 +689,7 @@ func TestTaskHistoryTable(t *testing.T) { } func TestPauseAndResume(t *testing.T) { - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx := context.Background() - ctx = util.WithInternalSourceType(ctx, "table_test") + sm, ctx := testutil.InitTableTest(t) testutil.CreateSubTask(t, sm, 1, proto.StepInit, "tidb1", []byte("test"), proto.TaskTypeExample, 11, false) testutil.CreateSubTask(t, sm, 1, proto.StepInit, "tidb1", []byte("test"), proto.TaskTypeExample, 11, false) @@ -776,11 +719,7 @@ func TestPauseAndResume(t *testing.T) { } func TestCancelAndExecIdChanged(t *testing.T) { - pool := GetResourcePool(t) - sm := GetTaskManager(t, pool) - defer pool.Close() - ctx, cancel := context.WithCancel(context.Background()) - ctx = util.WithInternalSourceType(ctx, "table_test") + sm, ctx, cancel := testutil.InitTableTestWithCancel(t) testutil.CreateSubTask(t, sm, 1, proto.StepInit, "tidb1", []byte("test"), proto.TaskTypeExample, 11, false) subtask, err := sm.GetFirstSubtaskInStates(ctx, "tidb1", 1, proto.StepInit, proto.TaskStatePending) diff --git a/pkg/disttask/framework/testutil/BUILD.bazel b/pkg/disttask/framework/testutil/BUILD.bazel index a84f828752684..ed5013dfb4809 100644 --- a/pkg/disttask/framework/testutil/BUILD.bazel +++ b/pkg/disttask/framework/testutil/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "dispatcher_util.go", "disttest_util.go", "executor_util.go", + "table_util.go", "task_util.go", ], importpath = "github.com/pingcap/tidb/pkg/disttask/framework/testutil", @@ -22,6 +23,7 @@ go_library( "//pkg/domain/infosync", "//pkg/sessionctx", "//pkg/testkit", + "@com_github_ngaut_pools//:pools", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", diff --git a/pkg/disttask/framework/testutil/table_util.go b/pkg/disttask/framework/testutil/table_util.go new file mode 100644 index 0000000000000..abe22ad1d1228 --- /dev/null +++ b/pkg/disttask/framework/testutil/table_util.go @@ -0,0 +1,64 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutil + +import ( + "context" + "testing" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" +) + +// InitTableTest inits needed components for table_test. +func InitTableTest(t *testing.T) (*storage.TaskManager, context.Context) { + pool := getResourcePool(t) + ctx := context.Background() + ctx = util.WithInternalSourceType(ctx, "table_test") + return getTaskManager(t, pool), ctx +} + +// InitTableTestWithCancel inits needed components with context.CancelFunc for table_test. +func InitTableTestWithCancel(t *testing.T) (*storage.TaskManager, context.Context, context.CancelFunc) { + pool := getResourcePool(t) + ctx, cancel := context.WithCancel(context.Background()) + ctx = util.WithInternalSourceType(ctx, "table_test") + return getTaskManager(t, pool), ctx, cancel +} + +func getResourcePool(t *testing.T) *pools.ResourcePool { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return tk.Session(), nil + }, 1, 1, time.Second) + + t.Cleanup(func() { + pool.Close() + }) + return pool +} + +func getTaskManager(t *testing.T, pool *pools.ResourcePool) *storage.TaskManager { + manager := storage.NewTaskManager(pool) + storage.SetTaskManager(manager) + manager, err := storage.GetTaskManager() + require.NoError(t, err) + return manager +}