From 631ce093d21f962fbc4b9a3e14871de42832a07c Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Tue, 12 Dec 2023 11:46:49 +0800 Subject: [PATCH] disttask: fix service_scope replaced (#49246) close pingcap/tidb#49245 --- pkg/disttask/framework/storage/table_test.go | 20 ++++++++++---------- pkg/disttask/framework/storage/task_table.go | 4 +++- pkg/executor/set.go | 2 ++ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 3d5cc6ea4ecd4..0dca7f2801953 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -599,31 +599,31 @@ func TestDistFrameworkMeta(t *testing.T) { defer pool.Close() ctx := context.Background() ctx = util.WithInternalSourceType(ctx, "table_test") - require.NoError(t, sm.StartManager(ctx, ":4000", "background")) require.NoError(t, sm.StartManager(ctx, ":4001", "")) - // will be replaced by below one require.NoError(t, sm.StartManager(ctx, ":4002", "background")) + // won't be replaced by below one require.NoError(t, sm.StartManager(ctx, ":4002", "")) require.NoError(t, sm.StartManager(ctx, ":4003", "background")) - allNodes, err := sm.GetAllNodes(ctx) - require.NoError(t, err) - require.Equal(t, []string{":4000", ":4001", ":4002", ":4003"}, allNodes) - - nodes, err := sm.GetManagedNodes(ctx) + nodes, err := sm.GetAllNodes(ctx) require.NoError(t, err) - require.Equal(t, []string{":4000", ":4003"}, nodes) + require.Equal(t, []string{":4000", ":4001", ":4002", ":4003"}, nodes) require.NoError(t, sm.CleanUpMeta(ctx, []string{":4000"})) nodes, err = sm.GetManagedNodes(ctx) require.NoError(t, err) - require.Equal(t, []string{":4003"}, nodes) + require.Equal(t, []string{":4002", ":4003"}, nodes) require.NoError(t, sm.CleanUpMeta(ctx, []string{":4003"})) nodes, err = sm.GetManagedNodes(ctx) require.NoError(t, err) - require.Equal(t, []string{":4001", ":4002"}, nodes) + require.Equal(t, []string{":4002"}, nodes) + + require.NoError(t, sm.CleanUpMeta(ctx, []string{":4002"})) + nodes, err = sm.GetManagedNodes(ctx) + require.NoError(t, err) + require.Equal(t, []string{":4001"}, nodes) } func TestSubtaskHistoryTable(t *testing.T) { diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index b200ef259f6e6..fb95aa4c5691a 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -645,7 +645,9 @@ func (stm *TaskManager) StartSubtask(ctx context.Context, subtaskID int64) error // StartManager insert the manager information into dist_framework_meta. func (stm *TaskManager) StartManager(ctx context.Context, tidbID string, role string) error { - _, err := stm.executeSQLWithNewSession(ctx, `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, tidbID, role) + _, err := stm.executeSQLWithNewSession(ctx, `insert into mysql.dist_framework_meta(host, role, keyspace_id) + SELECT %?, %?,-1 + WHERE NOT EXISTS (SELECT 1 FROM mysql.dist_framework_meta WHERE host = %?)`, tidbID, role, tidbID) return err } diff --git a/pkg/executor/set.go b/pkg/executor/set.go index 60259a409a2b9..5da9432610ad8 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" @@ -169,6 +170,7 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres logutil.BgLogger().Info("set global var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", showValStr)) if name == variable.TiDBServiceScope { dom := domain.GetDomain(e.Ctx()) + config.GetGlobalConfig().Instance.TiDBServiceScope = valStr serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID()) _, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx, `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, serverID, valStr)