diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 4c69704d7313..2fc412fdd8c2 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -59,7 +59,7 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt // Bootstrap saves default keyspace group info and init group mapping in the memory. func (m *GroupManager) Bootstrap() error { defaultKeyspaceGroup := &endpoint.KeyspaceGroup{ - ID: utils.DefaultKeySpaceGroupID, + ID: utils.DefaultKeyspaceGroupID, UserKind: endpoint.Basic.String(), } @@ -79,7 +79,7 @@ func (m *GroupManager) Bootstrap() error { m.groups[userKind].Put(defaultKeyspaceGroup) // Load all the keyspace groups from the storage and add to the respective userKind groups. - groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeySpaceGroupID, 0) + groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) if err != nil { return err } diff --git a/pkg/mcs/tso/server/handler.go b/pkg/mcs/tso/server/handler.go index 14f83bccc613..827b6773ba16 100644 --- a/pkg/mcs/tso/server/handler.go +++ b/pkg/mcs/tso/server/handler.go @@ -38,7 +38,7 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) er zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) - tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID) + tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID) if err != nil { log.Error("failed to get allocator manager", errs.ZapError(err)) return err diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index a54ce8e9118e..ebf389736f7e 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -209,7 +209,7 @@ func (s *Server) IsServing() bool { } member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID) + mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) if err != nil { log.Error("failed to get election member", errs.ZapError(err)) return false @@ -221,7 +221,7 @@ func (s *Server) IsServing() bool { // The entry at the index 0 is the primary's service endpoint. func (s *Server) GetLeaderListenUrls() []string { member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID) + mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) if err != nil { log.Error("failed to get election member", errs.ZapError(err)) return nil @@ -436,15 +436,14 @@ func (s *Server) startServer() (err error) { return err } + // Initialize the TSO service. s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg) - // The param `false` means that we don't initialize the keyspace group manager - // by loading the keyspace group meta from etcd. - if err := s.keyspaceGroupManager.Initialize(false); err != nil { + if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 64e04d2a1111..aa81425bc9f0 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -37,12 +37,13 @@ const ( // DefaultKeyspaceID is the default key space id. // Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215) - // ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap and reserved for users who haven't been assigned keyspace. + // ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap + // and reserved for users who haven't been assigned keyspace. DefaultKeyspaceID = uint32(0) - // DefaultKeySpaceGroupID is the default key space group id. + // DefaultKeyspaceGroupID is the default key space group id. // We also reserved 0 for the keyspace group for the same purpose. - DefaultKeySpaceGroupID = uint32(0) + DefaultKeyspaceGroupID = uint32(0) // APIServiceName is the name of api server. APIServiceName = "api" diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 90bbb79f0651..130af5d02d5a 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -222,23 +222,12 @@ func NewKeyspaceGroupManager( } // Initialize this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { - // Initialize the default keyspace group if not loading from storage - if !loadFromStorage { - group := &endpoint.KeyspaceGroup{ - ID: mcsutils.DefaultKeySpaceGroupID, - Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, - Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, - } - kgm.updateKeyspaceGroup(group) - return nil - } - +func (kgm *KeyspaceGroupManager) Initialize() error { // Load the initial keyspace group assignment from storage with time limit done := make(chan struct{}, 1) ctx, cancel := context.WithCancel(kgm.ctx) go kgm.checkInitProgress(ctx, cancel, done) - watchStartRevision, err := kgm.initAssignment(ctx) + watchStartRevision, sawDefaultKeyspaceGroup, err := kgm.initAssignment(ctx) done <- struct{}{} if err != nil { log.Error("failed to initialize keyspace group manager", errs.ZapError(err)) @@ -247,6 +236,12 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { return err } + // Initialize the default keyspace group if it isn't configured in the storage. + if !sawDefaultKeyspaceGroup { + keyspaces := []uint32{mcsutils.DefaultKeyspaceID} + kgm.initDefaultKeysapceGroup(keyspaces) + } + // Watch/apply keyspace group membership/distribution meta changes dynamically. kgm.wg.Add(1) go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision) @@ -284,14 +279,23 @@ func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel c <-done } +func (kgm *KeyspaceGroupManager) initDefaultKeysapceGroup(keyspaces []uint32) { + group := &endpoint.KeyspaceGroup{ + ID: mcsutils.DefaultKeyspaceGroupID, + Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, + Keyspaces: keyspaces, + } + kgm.updateKeyspaceGroup(group) +} + // initAssignment loads initial keyspace group assignment from storage and initialize the group manager. -func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) { +// Return watchStartRevision, the start revision for watching keyspace group membership/distribution change. +func (kgm *KeyspaceGroupManager) initAssignment( + ctx context.Context, +) (watchStartRevision int64, sawDefaultKeyspaceGroup bool, err error) { var ( - // The start revision for watching keyspace group membership/distribution change - watchStartRevision int64 groups []*endpoint.KeyspaceGroup more bool - err error keyspaceGroupsLoaded uint32 revision int64 ) @@ -300,7 +304,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err for { revision, groups, more, err = kgm.loadKeyspaceGroups(ctx, keyspaceGroupsLoaded, kgm.loadKeyspaceGroupsBatchSize) if err != nil { - return 0, err + return } keyspaceGroupsLoaded += uint32(len(groups)) @@ -313,10 +317,15 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err for _, group := range groups { select { case <-ctx.Done(): - return watchStartRevision, errs.ErrLoadKeyspaceGroupsTerminated + err = errs.ErrLoadKeyspaceGroupsTerminated + return default: } + if group.ID == mcsutils.DefaultKeyspaceGroupID { + sawDefaultKeyspaceGroup = true + } + kgm.updateKeyspaceGroup(group) } @@ -326,7 +335,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err } log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded)) - return watchStartRevision, nil + return } // loadKeyspaceGroups loads keyspace groups from the start ID with limit. @@ -441,7 +450,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( return revision, wresp.Err() } for _, event := range wresp.Events { - id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) + groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) if err != nil { log.Warn("failed to extract keyspace group ID from the key path", zap.String("key-path", string(event.Kv.Key)), zap.Error(err)) @@ -453,12 +462,20 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( group := &endpoint.KeyspaceGroup{} if err := json.Unmarshal(event.Kv.Value, group); err != nil { log.Warn("failed to unmarshal keyspace group", - zap.Uint32("keysapce-group-id", id), + zap.Uint32("keysapce-group-id", groupID), zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) } kgm.updateKeyspaceGroup(group) case clientv3.EventTypeDelete: - kgm.deleteKeyspaceGroup(id) + if groupID == mcsutils.DefaultKeyspaceGroupID { + keyspaces := kgm.kgs[groupID].Keyspaces + kgm.deleteKeyspaceGroup(groupID) + log.Warn("removed default keyspace group meta config from the storage. " + + "now every tso node/pod will initialize it") + kgm.initDefaultKeysapceGroup(keyspaces) + } else { + kgm.deleteKeyspaceGroup(groupID) + } } } revision = wresp.Header.Revision @@ -516,7 +533,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro tsRootPath string storage *endpoint.StorageEndpoint ) - if group.ID == mcsutils.DefaultKeySpaceGroupID { + if group.ID == mcsutils.DefaultKeyspaceGroupID { tsRootPath = kgm.legacySvcRootPath storage = kgm.legacySvcStorage } else { @@ -554,7 +571,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( return newKeyspaces[i] < newKeyspaces[j] }) - // Mostly, the membership has no change, so we optimize for this case. + // Mostly, the membership has no change, so optimize for this case. sameMembership := true if oldLen != newLen { sameMembership = false @@ -571,22 +588,36 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( defer kgm.Unlock() if sameMembership { - // The keyspace group membership is not changed, so we reuse the old one. + // The keyspace group membership is not changed, so reuse the old one. newGroup.KeyspaceLookupTable = oldGroup.KeyspaceLookupTable } else { - // The keyspace group membership is changed, so we update the keyspace lookup table. + // The keyspace group membership is changed, so update the keyspace lookup table. newGroup.KeyspaceLookupTable = make(map[uint32]struct{}) for i, j := 0, 0; i < oldLen || j < newLen; { if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { - newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{} + if groupID != mcsutils.DefaultKeyspaceGroupID && newKeyspaces[j] == mcsutils.DefaultKeyspaceID { + log.Warn("try to move default keyspace to non-default keyspace group. ignore it", + zap.Uint32("keyspace-group-id", groupID)) + } else { + newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{} + } i++ j++ } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { - delete(kgm.keyspaceLookupTable, oldKeyspaces[i]) + if groupID == mcsutils.DefaultKeyspaceGroupID && oldKeyspaces[i] == mcsutils.DefaultKeyspaceID { + log.Warn("try to move default keyspace out of default keyspace group. ignore it") + } else { + delete(kgm.keyspaceLookupTable, oldKeyspaces[i]) + } i++ } else { - newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{} - kgm.keyspaceLookupTable[newKeyspaces[j]] = groupID + if groupID != mcsutils.DefaultKeyspaceGroupID && newKeyspaces[j] == mcsutils.DefaultKeyspaceID { + log.Warn("try to move default keyspace to non-default keyspace group. ignore it", + zap.Uint32("keyspace-group-id", groupID)) + } else { + newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{} + kgm.keyspaceLookupTable[newKeyspaces[j]] = groupID + } j++ } } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index bb68815212d0..a71d92f7461c 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -96,31 +96,32 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoSvcRootPath := path.Join("/ms", guid, "tso") electionNamePrefix := "tso-server-" + guid - ksgMgr := NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, suite.cfg) - err := ksgMgr.Initialize(false) + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, electionNamePrefix, + legacySvcRootPath, tsoSvcRootPath, suite.cfg) + err := kgm.Initialize() re.NoError(err) - re.Equal(tsoServiceID, ksgMgr.tsoServiceID) - re.Equal(suite.etcdClient, ksgMgr.etcdClient) - re.Equal(electionNamePrefix, ksgMgr.electionNamePrefix) - re.Equal(legacySvcRootPath, ksgMgr.legacySvcRootPath) - re.Equal(tsoSvcRootPath, ksgMgr.tsoSvcRootPath) - re.Equal(suite.cfg, ksgMgr.cfg) - re.Equal(defaultLoadKeyspaceGroupsBatchSize, ksgMgr.loadKeyspaceGroupsBatchSize) - re.Equal(defaultLoadKeyspaceGroupsTimeout, ksgMgr.loadKeyspaceGroupsTimeout) + re.Equal(tsoServiceID, kgm.tsoServiceID) + re.Equal(suite.etcdClient, kgm.etcdClient) + re.Equal(electionNamePrefix, kgm.electionNamePrefix) + re.Equal(legacySvcRootPath, kgm.legacySvcRootPath) + re.Equal(tsoSvcRootPath, kgm.tsoSvcRootPath) + re.Equal(suite.cfg, kgm.cfg) + re.Equal(defaultLoadKeyspaceGroupsBatchSize, kgm.loadKeyspaceGroupsBatchSize) + re.Equal(defaultLoadKeyspaceGroupsTimeout, kgm.loadKeyspaceGroupsTimeout) - am, err := ksgMgr.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID) + am, err := kgm.GetAllocatorManager(mcsutils.DefaultKeyspaceGroupID) re.NoError(err) re.False(am.enableLocalTSO) - re.Equal(mcsutils.DefaultKeySpaceGroupID, am.kgID) + re.Equal(mcsutils.DefaultKeyspaceGroupID, am.kgID) re.Equal(mcsutils.DefaultLeaderLease, am.leaderLease) re.Equal(time.Hour*24, am.maxResetTSGap()) re.Equal(legacySvcRootPath, am.rootPath) re.Equal(time.Duration(mcsutils.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) - ksgMgr.Close() + kgm.Close() } // TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. @@ -187,7 +188,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { // the loading sleep 3 seconds. mgr.loadKeyspaceGroupsTimeout = time.Second re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delayLoadKeyspaceGroups", "return(3)")) - err := mgr.Initialize(true) + err := mgr.Initialize() // If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated. re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delayLoadKeyspaceGroups")) @@ -210,7 +211,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem // loading from etcd fail 2 times but the whole initialization still succeeds. mgr.loadFromEtcdMaxRetryTimes = 3 re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail", "return(2)")) - err := mgr.Initialize(true) + err := mgr.Initialize() re.NoError(err) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail")) } @@ -232,7 +233,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { // loading from etcd fail 3 times which should cause the whole initialization to fail. mgr.loadFromEtcdMaxRetryTimes = 3 re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail", "return(3)")) - err := mgr.Initialize(true) + err := mgr.Initialize() re.Error(err) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail")) } @@ -246,7 +247,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0) re.NotNil(mgr) defer mgr.Close() - err := mgr.Initialize(true) + err := mgr.Initialize() re.NoError(err) rootPath := mgr.legacySvcRootPath @@ -256,37 +257,37 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( events := []*etcdEvent{} // Assign keyspace group 0 to this host/pod/keyspace-group-manager. // final result: [0] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 0, []string{svcAddr}) + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr})) // Assign keyspace group 1 to this host/pod/keyspace-group-manager. // final result: [0,1] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 1, []string{"unknown", svcAddr}) + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 1, []uint32{1}, []string{"unknown", svcAddr})) // Assign keyspace group 2 to other host/pod/keyspace-group-manager. // final result: [0,1] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{"unknown"}) + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{"unknown"})) // Assign keyspace group 3 to this host/pod/keyspace-group-manager. // final result: [0,1,3] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{svcAddr}) - // Delete keyspace group 0 - // final result: [1,3] - events = generateKeyspaceGroupEvent(events, mvccpb.DELETE, 0, []string{}) + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{svcAddr})) + // Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0. + // final result: [0,1,3] + events = append(events, generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{})) // Put keyspace group 4 which doesn't belong to anyone. - // final result: [1,3] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{}) + // final result: [0,1,3] + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{})) // Put keyspace group 5 which doesn't belong to anyone. - // final result: [1,3] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 5, []string{}) + // final result: [0,1,3] + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 5, []uint32{5}, []string{})) // Assign keyspace group 2 to this host/pod/keyspace-group-manager. - // final result: [1,2,3] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{svcAddr}) + // final result: [0,1,2,3] + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 2, []uint32{2}, []string{svcAddr})) // Reassign keyspace group 3 to no one. - // final result: [1,2] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{}) + // final result: [0,1,2] + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 3, []uint32{3}, []string{})) // Reassign keyspace group 4 to this host/pod/keyspace-group-manager. - // final result: [1,2,4] - events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{svcAddr}) + // final result: [0,1,2,4] + events = append(events, generateKeyspaceGroupEvent(mvccpb.PUT, 4, []uint32{4}, []string{svcAddr})) // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. - idsExpected := []int{1, 2, 4} + expectedGroupIDs := []uint32{0, 1, 2, 4} // Apply the keyspace group assignment change events to etcd. for _, event := range events { @@ -302,8 +303,76 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( // Verify the keyspace group assignment. testutil.Eventually(re, func() bool { - idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr) - return reflect.DeepEqual(idsExpected, idsAssigned) + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) + }) +} + +// TestDefaultKeyspaceGroup tests the initialization logic of the default keyspace group. +// If the default keyspace group isn't configured in the etcd, every tso node/pod should initialize +// it and join the election for the primary of this group. +// If the default keyspace group is configured in the etcd, the tso nodes/pods which are assigned with +// this group will initialize it and join the election for the primary of this group. +func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { + re := suite.Require() + + var ( + expectedGroupIDs []uint32 + event *etcdEvent + ) + + // Start with the empty keyspace group assignment. + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0) + defer mgr.Close() + err := mgr.Initialize() + re.NoError(err) + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + expectedGroupIDs = []uint32{0} + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + re.True(reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs)) + + // Config keyspace group 0 in the storage but assigned to no one. + // final result: [] + expectedGroupIDs = []uint32{} + event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{"unknown"}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + testutil.Eventually(re, func() bool { + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) + }) + // Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager. + // final result: [0] + expectedGroupIDs = []uint32{0} + event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + testutil.Eventually(re, func() bool { + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) + }) + // Delete keyspace group 0. Every tso node/pod now should initialize keyspace group 0. + // final result: [0] + expectedGroupIDs = []uint32{0} + event = generateKeyspaceGroupEvent(mvccpb.DELETE, 0, []uint32{0}, []string{}) + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + re.NoError(err) + testutil.Eventually(re, func() bool { + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) + }) + // Config keyspace group 0 in the storage and assigned to this host/pod/keyspace-group-manager. + // final result: [0] + expectedGroupIDs = []uint32{0} + event = generateKeyspaceGroupEvent(mvccpb.PUT, 0, []uint32{0}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + testutil.Eventually(re, func() bool { + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) }) } @@ -327,34 +396,93 @@ func (suite *keyspaceGroupManagerTestSuite) TestGetAMWithMembershipCheck() { mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0, 1, 2}) - err = mgr.Initialize(true) + err = mgr.Initialize() re.NoError(err) // Should be able to get AM for keyspace 0, 1, 2 in keyspace group 0. - am, kgid, err = mgr.state.getAMWithMembershipCheck(0, 0) + am, kgid, err = mgr.getAMWithMembershipCheck(0, 0) re.NoError(err) re.Equal(uint32(0), kgid) re.NotNil(am) - am, kgid, err = mgr.state.getAMWithMembershipCheck(1, 0) + am, kgid, err = mgr.getAMWithMembershipCheck(1, 0) re.NoError(err) re.Equal(uint32(0), kgid) re.NotNil(am) - am, kgid, err = mgr.state.getAMWithMembershipCheck(2, 0) + am, kgid, err = mgr.getAMWithMembershipCheck(2, 0) re.NoError(err) re.Equal(uint32(0), kgid) re.NotNil(am) // Should fail because keyspace 3 is not in keyspace group 0. - am, kgid, err = mgr.state.getAMWithMembershipCheck(3, 0) + am, kgid, err = mgr.getAMWithMembershipCheck(3, 0) re.Error(err) re.Equal(uint32(0), kgid) re.Nil(am) // Should fail because keyspace group 1 doesn't exist. - am, kgid, err = mgr.state.getAMWithMembershipCheck(0, 1) + am, kgid, err = mgr.getAMWithMembershipCheck(0, 1) re.Error(err) re.Equal(uint32(0), kgid) re.Nil(am) } +// TestDefaultMembershipRestriction tests the restriction of default keyspace always +// belongs to default keyspace group. +func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { + re := suite.Require() + + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1) + re.NotNil(mgr) + defer mgr.Close() + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + var ( + am *AllocatorManager + kgid uint32 + err error + event *etcdEvent + ) + + // Create keyspace group 0 which contains keyspace 0, 1, 2. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) + // Create keyspace group 3 which contains keyspace 3, 4. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + uint32(3), []uint32{3, 4}) + + err = mgr.Initialize() + re.NoError(err) + + event = generateKeyspaceGroupEvent( + mvccpb.PUT, mcsutils.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + event = generateKeyspaceGroupEvent( + mvccpb.PUT, 3, []uint32{mcsutils.DefaultKeyspaceID, 3, 4}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + + // Should be able to still get AM for keyspace 0 in keyspace group 0. + // Sleep for a while to wait for the events to propagate. If the restriction is not working, + // it will cause random failure. + time.Sleep(1 * time.Second) + // Should be able to get AM for keyspace 0 in keyspace group 0. + am, kgid, err = mgr.getAMWithMembershipCheck( + mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) + re.NotNil(am) + // Can't get the default keyspace from the keyspace group 3 + am, kgid, err = mgr.getAMWithMembershipCheck(mcsutils.DefaultKeyspaceID, 3) + re.Error(err) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) + re.Nil(am) +} + // TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives // a tso request with mismatched keyspace and keyspace group. func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() { @@ -370,7 +498,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembers mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0, 1, 2}) - err := mgr.Initialize(true) + err := mgr.Initialize() re.NoError(err) // Should fail because keyspace 0 is not in keyspace group 1 and the API returns @@ -386,23 +514,21 @@ type etcdEvent struct { } func generateKeyspaceGroupEvent( - events []*etcdEvent, eventType mvccpb.Event_EventType, id uint32, addrs []string, -) []*etcdEvent { + eventType mvccpb.Event_EventType, groupID uint32, keyspaces []uint32, addrs []string, +) *etcdEvent { members := []endpoint.KeyspaceGroupMember{} for _, addr := range addrs { members = append(members, endpoint.KeyspaceGroupMember{Address: addr}) } - return append(events, - &etcdEvent{ - eventType: eventType, - ksg: &endpoint.KeyspaceGroup{ - ID: id, - Members: members, - Keyspaces: []uint32{id}, - }, + return &etcdEvent{ + eventType: eventType, + ksg: &endpoint.KeyspaceGroup{ + ID: groupID, + Members: members, + Keyspaces: keyspaces, }, - ) + } } // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. @@ -415,7 +541,7 @@ func runTestLoadKeyspaceGroupsAssignment( loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod ) { - idsExpected := []int{} + expectedGroupIDs := []uint32{} mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, loadKeyspaceGroupsBatchSize) re.NotNil(mgr) defer mgr.Close() @@ -442,7 +568,7 @@ func runTestLoadKeyspaceGroupsAssignment( if j < len(mgr.ams) && randomGen.Intn(100) < probabilityAssignToMe { assignToMe = true mux.Lock() - idsExpected = append(idsExpected, j) + expectedGroupIDs = append(expectedGroupIDs, uint32(j)) mux.Unlock() } addKeyspaceGroupAssignment( @@ -453,13 +579,21 @@ func runTestLoadKeyspaceGroupsAssignment( } wg.Wait() - err := mgr.Initialize(true) + err := mgr.Initialize() re.NoError(err) + // If no keyspace group is assigned to this host/pod, the default keyspace group should be initialized. + if numberOfKeypaceGroupsToAdd <= 0 { + expectedGroupIDs = append(expectedGroupIDs, mcsutils.DefaultKeyspaceGroupID) + } + // Verify the keyspace group assignment. - sort.Ints(idsExpected) - idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr) - re.Equal(idsExpected, idsAssigned) + // Sort the keyspaces in ascending order + sort.Slice(expectedGroupIDs, func(i, j int) bool { + return expectedGroupIDs[i] < expectedGroupIDs[j] + }) + assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) + re.Equal(expectedGroupIDs, assignedGroupIDs) } func newUniqueKeyspaceGroupManager( @@ -547,23 +681,23 @@ func addKeyspaceGroupAssignment( return nil } -func collectAssignedKeyspaceGroupIDs(re *require.Assertions, ksgMgr *KeyspaceGroupManager) []int { - ksgMgr.RLock() - defer ksgMgr.RUnlock() +func collectAssignedKeyspaceGroupIDs(re *require.Assertions, kgm *KeyspaceGroupManager) []uint32 { + kgm.RLock() + defer kgm.RUnlock() - ids := []int{} - for i := 0; i < len(ksgMgr.kgs); i++ { - ksg := ksgMgr.kgs[i] - if ksg == nil { - re.Nil(ksgMgr.ams[i], fmt.Sprintf("ksg is nil but am is not nil for id %d", i)) + ids := []uint32{} + for i := 0; i < len(kgm.kgs); i++ { + kg := kgm.kgs[i] + if kg == nil { + re.Nil(kgm.ams[i], fmt.Sprintf("ksg is nil but am is not nil for id %d", i)) } else { - am := ksgMgr.ams[i] + am := kgm.ams[i] re.NotNil(am, fmt.Sprintf("ksg is not nil but am is nil for id %d", i)) re.Equal(i, int(am.kgID)) - re.Equal(i, int(ksg.ID)) - for _, m := range ksg.Members { - if m.Address == ksgMgr.tsoServiceID.ServiceAddr { - ids = append(ids, i) + re.Equal(i, int(kg.ID)) + for _, m := range kg.Members { + if m.Address == kgm.tsoServiceID.ServiceAddr { + ids = append(ids, uint32(i)) break } } @@ -577,9 +711,16 @@ func (suite *keyspaceGroupManagerTestSuite) TestUpdateKeyspaceGroupMembership() re := suite.Require() // Start from an empty keyspace group. - oldGroup := &endpoint.KeyspaceGroup{ID: 0, Keyspaces: []uint32{}} - newGroup := &endpoint.KeyspaceGroup{ID: 0, Keyspaces: []uint32{}} - kgm := &KeyspaceGroupManager{state: state{keyspaceLookupTable: make(map[uint32]uint32)}} + // Use non-default keyspace group ID. + // The default keyspace group always contains the default keyspace. + // We have dedicated tests for the default keyspace group. + groupID := uint32(1) + oldGroup := &endpoint.KeyspaceGroup{ID: groupID, Keyspaces: []uint32{}} + newGroup := &endpoint.KeyspaceGroup{ID: groupID, Keyspaces: []uint32{}} + kgm := &KeyspaceGroupManager{ + state: state{ + keyspaceLookupTable: make(map[uint32]uint32), + }} kgm.updateKeyspaceGroupMembership(oldGroup, newGroup) verifyLocalKeyspaceLookupTable(re, newGroup.KeyspaceLookupTable, newGroup.Keyspaces) diff --git a/pkg/utils/tsoutil/tso_request.go b/pkg/utils/tsoutil/tso_request.go index b1998e458da6..b2459ccb371b 100644 --- a/pkg/utils/tsoutil/tso_request.go +++ b/pkg/utils/tsoutil/tso_request.go @@ -141,7 +141,7 @@ func (r *PDProtoRequest) getCount() uint32 { // count defins the count of timestamps to retrieve. func (r *PDProtoRequest) process(forwardStream stream, count uint32, tsoProtoFactory ProtoFactory) (tsoResp, error) { return forwardStream.process(r.request.GetHeader().GetClusterId(), count, - utils.DefaultKeyspaceID, utils.DefaultKeySpaceGroupID, r.request.GetDcLocation()) + utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID, r.request.GetDcLocation()) } // postProcess sends the response back to the sender of the request diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index fed09336c462..89bc6a8e5fc2 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -195,5 +195,5 @@ func validateKeyspaceGroupID(c *gin.Context) (uint32, error) { } func isValid(id uint32) bool { - return id >= utils.DefaultKeySpaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse + return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse } diff --git a/server/grpc_service.go b/server/grpc_service.go index 42f85e643365..64136d1d0799 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1774,7 +1774,7 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, KeyspaceId: utils.DefaultKeyspaceID, - KeyspaceGroupId: utils.DefaultKeySpaceGroupID, + KeyspaceGroupId: utils.DefaultKeyspaceGroupID, }, Count: 1, }) diff --git a/server/server.go b/server/server.go index 06ad2198288d..68663981069f 100644 --- a/server/server.go +++ b/server/server.go @@ -405,7 +405,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} if !s.IsAPIServiceMode() { - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {