diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 4b9896dfefa..4544c288896 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -36,6 +36,8 @@ const ( groupSettingsPathPrefix = "resource_group/settings" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" + // errNotLeader is returned when the requested server is not pd leader. + errNotLeader = "not leader" ) // GroupSettingsPathPrefixBytes is used to watch or get resource groups. @@ -65,7 +67,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) { // gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. func (c *client) gRPCErrorHandler(err error) { - if strings.Contains(err.Error(), errNotPrimary) { + if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) { c.pdSvcDiscovery.ScheduleCheckMemberChanged() } } diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index ad61fe0a3fa..3d208aef2a2 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -48,6 +48,8 @@ const ( // UserKindKey is the key for user kind in keyspace config. UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. + // Note: Config[TSOKeyspaceGroupIDKey] is only used to judge whether there is keyspace group id. + // It will not update the keyspace group id when merging or splitting. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" // MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. // We use 120 here to leave some space for other operations. @@ -783,7 +785,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID if err != nil { return err } + manager.kgm.Lock() manager.kgm.groups[endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)].Put(defaultKeyspaceGroup) + manager.kgm.Unlock() // If all keyspaces in the current batch are assigned, update the next start ID. manager.nextPatrolStartID = nextStartID } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 88d478fd50f..084380549fc 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -388,6 +388,20 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) return config, nil } +// GetGroupByKeyspaceID returns the keyspace group ID for the given keyspace ID. +func (m *GroupManager) GetGroupByKeyspaceID(id uint32) (uint32, error) { + m.RLock() + defer m.RUnlock() + for _, groups := range m.groups { + for _, group := range groups.GetAll() { + if slice.Contains(group.Keyspaces, id) { + return group.ID, nil + } + } + } + return 0, ErrKeyspaceNotInAnyKeyspaceGroup +} + var failpointOnce sync.Once // UpdateKeyspaceForGroup updates the keyspace field for the keyspace group. diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 100b0eb6986..2923dc7053f 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -70,6 +70,8 @@ var ( } // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") + // ErrKeyspaceNotInAnyKeyspaceGroup is used to indicate target keyspace is not in any keyspace group. + ErrKeyspaceNotInAnyKeyspaceGroup = errors.New("keyspace is not in any keyspace group") // ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group. ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group") // ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group. diff --git a/server/api/hot_status.go b/server/api/hot_status.go index 0296155596b..749a371300a 100644 --- a/server/api/hot_status.go +++ b/server/api/hot_status.go @@ -210,7 +210,7 @@ func (h *hotStatusHandler) GetHotBuckets(w http.ResponseWriter, r *http.Request) ids[i] = id } } - stats := h.Handler.GetHotBuckets() + stats := h.Handler.GetHotBuckets(ids...) ret := HotBucketsResponse{} for regionID, stats := range stats { ret[regionID] = make([]*HotBucketsItem, len(stats)) diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index 2568a3c744d..b93dc84faf8 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -110,6 +110,20 @@ func LoadKeyspace(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } + if value, ok := c.GetQuery("force_refresh_group_id"); ok && value == "true" { + groupManager := svr.GetKeyspaceGroupManager() + if groupManager == nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr) + return + } + // keyspace has been checked in LoadKeyspace, so no need to check again. + groupID, err := groupManager.GetGroupByKeyspaceID(meta.GetId()) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + meta.Config[keyspace.TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(groupID), 10) + } c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta}) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 35e2ba9fa9a..7bb24d79cf1 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1348,7 +1348,6 @@ func TestSyncConfig(t *testing.T) { if v.updated { re.True(switchRaftV2) tc.opt.UseRaftV2() - re.EqualValues(0, tc.opt.GetMaxMergeRegionSize()) re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize()) success, switchRaftV2 = syncConfig(tc.storeConfigManager, tc.GetStores()) re.True(success) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 81484e12607..8d6d1b6b6bc 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -202,13 +202,7 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { } // UseRaftV2 set some config for raft store v2 by default temporary. -// todo: remove this after raft store support this. -// disable merge check -func (o *PersistOptions) UseRaftV2() { - v := o.GetScheduleConfig().Clone() - v.MaxMergeRegionSize = 0 - o.SetScheduleConfig(v) -} +func (o *PersistOptions) UseRaftV2() {} const ( maxSnapshotCountKey = "schedule.max-snapshot-count" diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index ca4209a9215..4bdefefbb81 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -968,8 +968,12 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } re.NoError(suite.cluster.RunServers(serverList)) suite.cluster.WaitLeader() - newGroups, err := cli.ListResourceGroups(suite.ctx) - re.NoError(err) + var newGroups []*rmpb.ResourceGroup + testutil.Eventually(suite.Require(), func() bool { + var err error + newGroups, err = cli.ListResourceGroups(suite.ctx) + return err == nil + }, testutil.WithWaitFor(time.Second)) re.Equal(groups, newGroups) } diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 4e926687223..2dfa89acb52 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -283,6 +283,13 @@ func TestHotWithStoreID(t *testing.T) { re.Equal(buckets.GetStats().ReadKeys[0]/interval, item.ReadKeys) re.Equal(buckets.GetStats().WriteBytes[0]/interval, item.WriteBytes) re.Equal(buckets.GetStats().WriteKeys[0]/interval, item.WriteKeys) + + args = []string{"-u", pdAddr, "hot", "buckets", "2"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + hotBuckets = api.HotBucketsResponse{} + re.NoError(json.Unmarshal(output, &hotBuckets)) + re.Nil(hotBuckets[2]) } func TestHistoryHotRegions(t *testing.T) { diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 40315e835f8..1d0c8132c13 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -561,9 +561,8 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { args := []string{"-u", pdAddr, "keyspace-group"} output, err := pdctl.ExecuteCommand(cmd, append(args, "1")...) re.NoError(err) - err = json.Unmarshal(output, &keyspaceGroup) - re.NoError(err) + re.NoErrorf(err, "output: %s", string(output)) return len(keyspaceGroup.Members) == 2 }) for _, member := range keyspaceGroup.Members { diff --git a/tests/pdctl/keyspace/keyspace_test.go b/tests/pdctl/keyspace/keyspace_test.go new file mode 100644 index 00000000000..a0bab4114df --- /dev/null +++ b/tests/pdctl/keyspace/keyspace_test.go @@ -0,0 +1,103 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace_test + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/testutil" + api "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" +) + +func TestKeyspace(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + keyspaces := make([]string, 0) + for i := 1; i < 10; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) + re.NoError(err) + defer ttc.Destroy() + cmd := pdctlCmd.GetRootCmd() + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) + + var k api.KeyspaceMeta + keyspaceName := "keyspace_1" + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace", keyspaceName} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &k)) + return k.GetName() == keyspaceName + }) + re.Equal(uint32(1), k.GetId()) + re.Equal(defaultKeyspaceGroupID, k.Config[keyspace.TSOKeyspaceGroupIDKey]) + + // split keyspace group. + newGroupID := "2" + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "split", "0", newGroupID, "1"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + // check keyspace group in keyspace whether changed. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace", keyspaceName} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &k)) + return newGroupID == k.Config[keyspace.TSOKeyspaceGroupIDKey] + }) + + // test error name + args := []string{"-u", pdAddr, "keyspace", "error_name"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Fail") + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +} diff --git a/tools/pd-ctl/pdctl/command/keyspace_command.go b/tools/pd-ctl/pdctl/command/keyspace_command.go new file mode 100644 index 00000000000..a68e2f05a80 --- /dev/null +++ b/tools/pd-ctl/pdctl/command/keyspace_command.go @@ -0,0 +1,48 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + "net/http" + + "github.com/spf13/cobra" +) + +const keyspacePrefix = "pd/api/v2/keyspaces" + +// NewKeyspaceCommand returns a keyspace subcommand of rootCmd. +func NewKeyspaceCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "keyspace [command] [flags]", + Short: "show keyspace information", + Run: showKeyspaceCommandFunc, + } + return cmd +} + +func showKeyspaceCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } + + resp, err := doRequest(cmd, fmt.Sprintf("%s/%s?force_refresh_group_id=true", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + if err != nil { + cmd.Printf("Failed to get the keyspace information: %s\n", err) + return + } + cmd.Println(resp) +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 8fc3a454d7a..86494c046eb 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -66,6 +66,7 @@ func GetRootCmd() *cobra.Command { command.NewCompletionCommand(), command.NewUnsafeCommand(), command.NewKeyspaceGroupCommand(), + command.NewKeyspaceCommand(), ) rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true