Skip to content

Commit

Permalink
Init default keyspace group in the tso service
Browse files Browse the repository at this point in the history
Changes:
1. Introduce the initialization logic of the default keyspace group.
	a. If the default keyspace group isn't configured in the etcd, every tso node/pod should initialize it and join the election for the primary of this group.
	b. If the default keyspace group is configured in the etcd, the tso nodes/pods which are assigned with this group will initialize it and join the election for the primary of this group.
2. Introduce the keyspace group membership restriction -- default keyspace always belongs to default keyspace group.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Apr 12, 2023
1 parent 3779808 commit e260abd
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 124 deletions.
4 changes: 2 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt
// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
defaultKeyspaceGroup := &endpoint.KeyspaceGroup{
ID: utils.DefaultKeySpaceGroupID,
ID: utils.DefaultKeyspaceGroupID,
UserKind: endpoint.Basic.String(),
}

Expand All @@ -79,7 +79,7 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(defaultKeyspaceGroup)

// Load all the keyspace groups from the storage and add to the respective userKind groups.
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeySpaceGroupID, 0)
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) er
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
Expand Down
9 changes: 4 additions & 5 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *Server) IsServing() bool {
}

member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID)
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
Expand All @@ -221,7 +221,7 @@ func (s *Server) IsServing() bool {
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID)
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return nil
Expand Down Expand Up @@ -436,15 +436,14 @@ func (s *Server) startServer() (err error) {
return err
}

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
// The param `false` means that we don't initialize the keyspace group manager
// by loading the keyspace group meta from etcd.
if err := s.keyspaceGroupManager.Initialize(false); err != nil {
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ const (

// DefaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap and reserved for users who haven't been assigned keyspace.
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)

// DefaultKeySpaceGroupID is the default key space group id.
// DefaultKeyspaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeySpaceGroupID = uint32(0)
DefaultKeyspaceGroupID = uint32(0)

// APIServiceName is the name of api server.
APIServiceName = "api"
Expand Down
93 changes: 62 additions & 31 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,12 @@ func NewKeyspaceGroupManager(
}

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
// Initialize the default keyspace group if not loading from storage
if !loadFromStorage {
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeySpaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
return nil
}

func (kgm *KeyspaceGroupManager) Initialize() error {
// Load the initial keyspace group assignment from storage with time limit
done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(kgm.ctx)
go kgm.checkInitProgress(ctx, cancel, done)
watchStartRevision, err := kgm.initAssignment(ctx)
watchStartRevision, sawDefaultKeyspaceGroup, err := kgm.initAssignment(ctx)
done <- struct{}{}
if err != nil {
log.Error("failed to initialize keyspace group manager", errs.ZapError(err))
Expand All @@ -247,6 +236,12 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
return err
}

// Initialize the default keyspace group if it isn't configured in the storage.
if !sawDefaultKeyspaceGroup {
keyspaces := []uint32{mcsutils.DefaultKeyspaceID}
kgm.initDefaultKeysapceGroup(keyspaces)
}

// Watch/apply keyspace group membership/distribution meta changes dynamically.
kgm.wg.Add(1)
go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision)
Expand Down Expand Up @@ -284,14 +279,23 @@ func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel c
<-done
}

func (kgm *KeyspaceGroupManager) initDefaultKeysapceGroup(keyspaces []uint32) {
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
Keyspaces: keyspaces,
}
kgm.updateKeyspaceGroup(group)
}

// initAssignment loads initial keyspace group assignment from storage and initialize the group manager.
func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) {
// Return watchStartRevision, the start revision for watching keyspace group membership/distribution change.
func (kgm *KeyspaceGroupManager) initAssignment(
ctx context.Context,
) (watchStartRevision int64, sawDefaultKeyspaceGroup bool, err error) {
var (
// The start revision for watching keyspace group membership/distribution change
watchStartRevision int64
groups []*endpoint.KeyspaceGroup
more bool
err error
keyspaceGroupsLoaded uint32
revision int64
)
Expand All @@ -300,7 +304,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
for {
revision, groups, more, err = kgm.loadKeyspaceGroups(ctx, keyspaceGroupsLoaded, kgm.loadKeyspaceGroupsBatchSize)
if err != nil {
return 0, err
return
}

keyspaceGroupsLoaded += uint32(len(groups))
Expand All @@ -313,10 +317,15 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
for _, group := range groups {
select {
case <-ctx.Done():
return watchStartRevision, errs.ErrLoadKeyspaceGroupsTerminated
err = errs.ErrLoadKeyspaceGroupsTerminated
return
default:
}

if group.ID == mcsutils.DefaultKeyspaceGroupID {
sawDefaultKeyspaceGroup = true
}

kgm.updateKeyspaceGroup(group)
}

Expand All @@ -326,7 +335,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err
}

log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded))
return watchStartRevision, nil
return
}

// loadKeyspaceGroups loads keyspace groups from the start ID with limit.
Expand Down Expand Up @@ -441,7 +450,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
return revision, wresp.Err()
}
for _, event := range wresp.Events {
id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key))
groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key))
if err != nil {
log.Warn("failed to extract keyspace group ID from the key path",
zap.String("key-path", string(event.Kv.Key)), zap.Error(err))
Expand All @@ -453,12 +462,20 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
group := &endpoint.KeyspaceGroup{}
if err := json.Unmarshal(event.Kv.Value, group); err != nil {
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keysapce-group-id", id),
zap.Uint32("keysapce-group-id", groupID),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
}
kgm.updateKeyspaceGroup(group)
case clientv3.EventTypeDelete:
kgm.deleteKeyspaceGroup(id)
if groupID == mcsutils.DefaultKeyspaceGroupID {
keyspaces := kgm.kgs[groupID].Keyspaces
kgm.deleteKeyspaceGroup(groupID)
log.Warn("removed default keyspace group meta config from the storage. " +
"now every tso node/pod will initialize it")
kgm.initDefaultKeysapceGroup(keyspaces)
} else {
kgm.deleteKeyspaceGroup(groupID)
}
}
}
revision = wresp.Header.Revision
Expand Down Expand Up @@ -516,7 +533,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
tsRootPath string
storage *endpoint.StorageEndpoint
)
if group.ID == mcsutils.DefaultKeySpaceGroupID {
if group.ID == mcsutils.DefaultKeyspaceGroupID {
tsRootPath = kgm.legacySvcRootPath
storage = kgm.legacySvcStorage
} else {
Expand Down Expand Up @@ -554,7 +571,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
return newKeyspaces[i] < newKeyspaces[j]
})

// Mostly, the membership has no change, so we optimize for this case.
// Mostly, the membership has no change, so optimize for this case.
sameMembership := true
if oldLen != newLen {
sameMembership = false
Expand All @@ -571,22 +588,36 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
defer kgm.Unlock()

if sameMembership {
// The keyspace group membership is not changed, so we reuse the old one.
// The keyspace group membership is not changed, so reuse the old one.
newGroup.KeyspaceLookupTable = oldGroup.KeyspaceLookupTable
} else {
// The keyspace group membership is changed, so we update the keyspace lookup table.
// The keyspace group membership is changed, so update the keyspace lookup table.
newGroup.KeyspaceLookupTable = make(map[uint32]struct{})
for i, j := 0, 0; i < oldLen || j < newLen; {
if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
if groupID != mcsutils.DefaultKeyspaceGroupID && newKeyspaces[j] == mcsutils.DefaultKeyspaceID {
log.Warn("try to move default keyspace to non-default keyspace group. ignore it",
zap.Uint32("keyspace-group-id", groupID))
} else {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
}
i++
j++
} else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
if groupID == mcsutils.DefaultKeyspaceGroupID && oldKeyspaces[i] == mcsutils.DefaultKeyspaceID {
log.Warn("try to move default keyspace out of default keyspace group. ignore it")
} else {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
}
i++
} else {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
kgm.keyspaceLookupTable[newKeyspaces[j]] = groupID
if groupID != mcsutils.DefaultKeyspaceGroupID && newKeyspaces[j] == mcsutils.DefaultKeyspaceID {
log.Warn("try to move default keyspace to non-default keyspace group. ignore it",
zap.Uint32("keyspace-group-id", groupID))
} else {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
kgm.keyspaceLookupTable[newKeyspaces[j]] = groupID
}
j++
}
}
Expand Down
Loading

0 comments on commit e260abd

Please sign in to comment.