Skip to content

Commit

Permalink
disttask: fix service_scope replaced (pingcap#49246)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Dec 12, 2023
1 parent ba55c98 commit 631ce09
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
20 changes: 10 additions & 10 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,31 +599,31 @@ func TestDistFrameworkMeta(t *testing.T) {
defer pool.Close()
ctx := context.Background()
ctx = util.WithInternalSourceType(ctx, "table_test")

require.NoError(t, sm.StartManager(ctx, ":4000", "background"))
require.NoError(t, sm.StartManager(ctx, ":4001", ""))
// will be replaced by below one
require.NoError(t, sm.StartManager(ctx, ":4002", "background"))
// won't be replaced by below one
require.NoError(t, sm.StartManager(ctx, ":4002", ""))
require.NoError(t, sm.StartManager(ctx, ":4003", "background"))

allNodes, err := sm.GetAllNodes(ctx)
require.NoError(t, err)
require.Equal(t, []string{":4000", ":4001", ":4002", ":4003"}, allNodes)

nodes, err := sm.GetManagedNodes(ctx)
nodes, err := sm.GetAllNodes(ctx)
require.NoError(t, err)
require.Equal(t, []string{":4000", ":4003"}, nodes)
require.Equal(t, []string{":4000", ":4001", ":4002", ":4003"}, nodes)

require.NoError(t, sm.CleanUpMeta(ctx, []string{":4000"}))
nodes, err = sm.GetManagedNodes(ctx)
require.NoError(t, err)
require.Equal(t, []string{":4003"}, nodes)
require.Equal(t, []string{":4002", ":4003"}, nodes)

require.NoError(t, sm.CleanUpMeta(ctx, []string{":4003"}))
nodes, err = sm.GetManagedNodes(ctx)
require.NoError(t, err)
require.Equal(t, []string{":4001", ":4002"}, nodes)
require.Equal(t, []string{":4002"}, nodes)

require.NoError(t, sm.CleanUpMeta(ctx, []string{":4002"}))
nodes, err = sm.GetManagedNodes(ctx)
require.NoError(t, err)
require.Equal(t, []string{":4001"}, nodes)
}

func TestSubtaskHistoryTable(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,9 @@ func (stm *TaskManager) StartSubtask(ctx context.Context, subtaskID int64) error

// StartManager insert the manager information into dist_framework_meta.
func (stm *TaskManager) StartManager(ctx context.Context, tidbID string, role string) error {
_, err := stm.executeSQLWithNewSession(ctx, `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, tidbID, role)
_, err := stm.executeSQLWithNewSession(ctx, `insert into mysql.dist_framework_meta(host, role, keyspace_id)
SELECT %?, %?,-1
WHERE NOT EXISTS (SELECT 1 FROM mysql.dist_framework_meta WHERE host = %?)`, tidbID, role, tidbID)
return err
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
Expand Down Expand Up @@ -169,6 +170,7 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
logutil.BgLogger().Info("set global var", zap.Uint64("conn", sessionVars.ConnectionID), zap.String("name", name), zap.String("val", showValStr))
if name == variable.TiDBServiceScope {
dom := domain.GetDomain(e.Ctx())
config.GetGlobalConfig().Instance.TiDBServiceScope = valStr
serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID())
_, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx,
`replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, serverID, valStr)
Expand Down

0 comments on commit 631ce09

Please sign in to comment.