Skip to content

Commit

Permalink
Fix data race between read APIs and finshiMe:wq keyspace group manager
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 30, 2023
1 parent fa721e7 commit 33d4300
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 31 deletions.
21 changes: 21 additions & 0 deletions pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,27 @@ type KeyspaceGroup struct {
KeyspaceLookupTable map[uint32]struct{} `json:"-"`
}

// NewKeyspaceGroup creates a new keyspace group.
func NewKeyspaceGroup(
id uint32,
userKind string,
splitState *SplitState,
mergeState *MergeState,
members []KeyspaceGroupMember,
keyspaces []uint32,
keyspaceLookupTable map[uint32]struct{},
) *KeyspaceGroup {
return &KeyspaceGroup{
ID: id,
UserKind: userKind,
SplitState: splitState,
MergeState: mergeState,
Members: members,
Keyspaces: keyspaces,
KeyspaceLookupTable: keyspaceLookupTable,
}
}

// IsSplitting checks if the keyspace group is in split state.
func (kg *KeyspaceGroup) IsSplitting() bool {
return kg != nil && kg.SplitState != nil
Expand Down
94 changes: 63 additions & 31 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta(
return s.ams[groupID], s.kgs[groupID]
}

func (s *state) checkTSOSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
s.RLock()
defer s.RUnlock()
splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID]
// Only the split target keyspace group needs to check the TSO split.
if !splitTargetGroup.IsSplitTarget() {
return nil, nil, nil
}
sourceGroupID := splitTargetGroup.SplitSource()
splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID]
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", sourceGroupID))
return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID)
}
return splitTargetAM, splitSourceAM, nil
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (s *state) checkTSOMerge(
groupID uint32,
) error {
s.RLock()
defer s.RUnlock()
if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID)
}

// getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace.
// It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta
// of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given
Expand Down Expand Up @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
err = kgm.state.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -1032,19 +1065,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
keyspaceGroupID uint32,
dcLocation string,
) error {
splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
// Only the split target keyspace group needs to check the TSO split.
if !splitGroup.IsSplitTarget() {
return nil
}
splitSource := splitGroup.SplitSource()
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", splitSource))
return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource)
splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID)
if err != nil {
return err
}
splitAllocator, err := splitAM.GetAllocator(dcLocation)
splitAllocator, err := splitTargetAM.GetAllocator(dcLocation)
if err != nil {
return err
}
Expand Down Expand Up @@ -1116,9 +1141,18 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
splitGroup.SplitState = nil
kgm.kgs[id] = splitGroup
// Pre-update the split keyspace group's split state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
newSplitGroup := endpoint.NewKeyspaceGroup(
splitGroup.ID,
splitGroup.UserKind,
nil, // erase the split state here
splitGroup.MergeState,
splitGroup.Members,
splitGroup.Keyspaces,
splitGroup.KeyspaceLookupTable,
)
kgm.kgs[id] = newSplitGroup
return nil
}

Expand Down Expand Up @@ -1146,9 +1180,19 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget

// Pre-update the merge target keyspace group's merge state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
newTargetGroup := endpoint.NewKeyspaceGroup(
mergeTarget.ID,
mergeTarget.UserKind,
mergeTarget.SplitState,
nil, // erase the merge state here
mergeTarget.Members,
mergeTarget.Keyspaces,
mergeTarget.KeyspaceLookupTable,
)
kgm.kgs[id] = newTargetGroup
return nil
}

Expand Down Expand Up @@ -1286,15 +1330,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}

0 comments on commit 33d4300

Please sign in to comment.