From 040b6dde60564496e5029e1de5276e2cd087e9d2 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 29 Jun 2023 18:41:08 -0700 Subject: [PATCH] Fix data race between read APIs and finshiMe:wq keyspace group manager Signed-off-by: Bin Shi --- pkg/storage/endpoint/tso_keyspace_group.go | 21 +++++ pkg/tso/keyspace_group_manager.go | 94 +++++++++++++++------- 2 files changed, 84 insertions(+), 31 deletions(-) diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 498cd878887a..054fb02f55e8 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -111,6 +111,27 @@ type KeyspaceGroup struct { KeyspaceLookupTable map[uint32]struct{} `json:"-"` } +// NewKeyspaceGroup creates a new keyspace group. +func NewKeyspaceGroup( + id uint32, + userKind string, + splitState *SplitState, + mergeState *MergeState, + members []KeyspaceGroupMember, + keyspaces []uint32, + keyspaceLookupTable map[uint32]struct{}, +) *KeyspaceGroup { + return &KeyspaceGroup{ + ID: id, + UserKind: userKind, + SplitState: splitState, + MergeState: mergeState, + Members: members, + Keyspaces: keyspaces, + KeyspaceLookupTable: keyspaceLookupTable, + } +} + // IsSplitting checks if the keyspace group is in split state. func (kg *KeyspaceGroup) IsSplitting() bool { return kg != nil && kg.SplitState != nil diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index a82376430faa..92e002d3412b 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta( return s.ams[groupID], s.kgs[groupID] } +func (s *state) checkTSOSplit( + targetGroupID uint32, +) (splitTargetAM, splitSourceAM *AllocatorManager, err error) { + s.RLock() + defer s.RUnlock() + splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID] + // Only the split target keyspace group needs to check the TSO split. + if !splitTargetGroup.IsSplitTarget() { + return nil, nil, nil + } + sourceGroupID := splitTargetGroup.SplitSource() + splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID] + if splitSourceAM == nil || splitSourceGroup == nil { + log.Error("the split source keyspace group is not initialized", + zap.Uint32("source", sourceGroupID)) + return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID) + } + return splitTargetAM, splitSourceAM, nil +} + +// Reject any request if the keyspace group is in merging state, +// we need to wait for the merging checker to finish the TSO merging. +func (s *state) checkTSOMerge( + groupID uint32, +) error { + s.RLock() + defer s.RUnlock() + if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() { + return nil + } + return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID) +} + // getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace. // It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta // of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } - err = kgm.checkTSOMerge(curKeyspaceGroupID) + err = kgm.state.checkTSOMerge(curKeyspaceGroupID) if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } @@ -1032,19 +1065,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( keyspaceGroupID uint32, dcLocation string, ) error { - splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID) - // Only the split target keyspace group needs to check the TSO split. - if !splitGroup.IsSplitTarget() { - return nil - } - splitSource := splitGroup.SplitSource() - splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource) - if splitSourceAM == nil || splitSourceGroup == nil { - log.Error("the split source keyspace group is not initialized", - zap.Uint32("source", splitSource)) - return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource) + splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID) + if err != nil { + return err } - splitAllocator, err := splitAM.GetAllocator(dcLocation) + splitAllocator, err := splitTargetAM.GetAllocator(dcLocation) if err != nil { return err } @@ -1116,9 +1141,18 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - // Pre-update the split keyspace group split state in memory. - splitGroup.SplitState = nil - kgm.kgs[id] = splitGroup + // Pre-update the split keyspace group's split state in memory. + // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. + newSplitGroup := endpoint.NewKeyspaceGroup( + splitGroup.ID, + splitGroup.UserKind, + nil, + splitGroup.MergeState, + splitGroup.Members, + splitGroup.Keyspaces, + splitGroup.KeyspaceLookupTable, + ) + kgm.kgs[id] = newSplitGroup return nil } @@ -1146,9 +1180,19 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - // Pre-update the split keyspace group split state in memory. - mergeTarget.MergeState = nil - kgm.kgs[id] = mergeTarget + + // Pre-update the merge target keyspace group's merge state in memory. + // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. + newTargetGroup := endpoint.NewKeyspaceGroup( + mergeTarget.ID, + mergeTarget.UserKind, + mergeTarget.SplitState, + nil, + mergeTarget.Members, + mergeTarget.Keyspaces, + mergeTarget.KeyspaceLookupTable, + ) + kgm.kgs[id] = newTargetGroup return nil } @@ -1286,15 +1330,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget return } } - -// Reject any request if the keyspace group is in merging state, -// we need to wait for the merging checker to finish the TSO merging. -func (kgm *KeyspaceGroupManager) checkTSOMerge( - keyspaceGroupID uint32, -) error { - _, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID) - if !group.IsMerging() { - return nil - } - return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID) -}