Skip to content

Commit

Permalink
Add unittest
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Apr 10, 2023
1 parent 469fe64 commit 72f07ae
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 26 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceNotAssigned"]
error = '''
the keyspace isn't assigned to any keyspace group, %s
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
39 changes: 21 additions & 18 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"

perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -511,35 +512,35 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*A
if am := kgm.ams[keyspaceGroupID].Load(); am != nil {
return am, nil
}
return nil, kgm.genNotServedErr(keyspaceGroupID)
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}

// GetAMWithMembershipCheck returns the AllocatorManager of the given keyspace group and check if the keyspace
// is served by this keyspace group.
func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck(
keyspaceID, keyspaceGroupID uint32,
) (*AllocatorManager, error) {
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return nil, err
}
if am := kgm.ams[keyspaceGroupID].Load(); am != nil {
ksg := kgm.ksgs[keyspaceGroupID].Load()
if ksg == nil {
return nil, kgm.genNotServedErr(keyspaceGroupID)
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; !ok {
return nil, kgm.genNotServedErr(keyspaceGroupID)
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
return am, nil
}
return nil, kgm.genNotServedErr(keyspaceGroupID)
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}

// GetElectionMember returns the election member of the given keyspace group
// TODO: support multiple keyspace groups for GetElectionMember
func (kgm *KeyspaceGroupManager) GetElectionMember(
keyspaceID, keyspaceGroupID uint32,
) (ElectionMember, error) {
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return nil, err
}
am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
if err != nil {
return nil, err
Expand All @@ -552,17 +553,18 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
keyspaceID, keyspaceGroupID uint32,
dcLocation string, count uint32,
) (ts pdpb.Timestamp, currentKeyspaceGroupID uint32, err error) {
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return pdpb.Timestamp{}, keyspaceGroupID, err
}
am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
if err != nil {
// If the keyspace doesn't belong to this keyspace group, we should check if it belongs to any other
// The keyspace doesn't belong to this keyspace group, we should check if it belongs to any other
// keyspace groups, and return the correct keyspace group ID to the client.
if strings.Contains(err.Error(), errs.NotServedErr) {
for i := 0; i < int(mcsutils.MaxKeyspaceGroupCountInUse); i++ {
if ksg := kgm.ksgs[i].Load(); ksg == nil {
continue
} else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok {
return pdpb.Timestamp{}, ksg.ID, err
}
for i := 0; i < int(mcsutils.MaxKeyspaceGroupCountInUse); i++ {
if ksg := kgm.ksgs[i].Load(); ksg == nil {
continue
} else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok {
return pdpb.Timestamp{}, ksg.ID, err
}
}
return pdpb.Timestamp{}, keyspaceGroupID, err
Expand All @@ -580,8 +582,9 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error {
id, mcsutils.MaxKeyspaceGroupCountInUse))
}

func (kgm *KeyspaceGroupManager) genNotServedErr(keyspaceGroupID uint32) error {
return errs.ErrGetAllocatorManager.FastGenByArgs(
fmt.Sprintf("requested keyspace group with id %d %s by this host/pod",
func (kgm *KeyspaceGroupManager) genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error {
return perr.FastGenByArgs(
fmt.Sprintf(
"requested keyspace group with id %d %s by this host/pod",
keyspaceGroupID, errs.NotServedErr))
}
84 changes: 76 additions & 8 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() {

addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0))
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0})

// Set the timeout to 1 second and inject the delayLoadKeyspaceGroups to return 3 seconds to let
// the loading sleep 3 seconds.
Expand All @@ -204,7 +204,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem

addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0))
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0})

// Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 2 to let
// loading from etcd fail 2 times but the whole initialization still succeeds.
Expand All @@ -226,7 +226,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() {

addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0))
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0})

// Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 3 to let
// loading from etcd fail 3 times which should cause the whole initialization to fail.
Expand Down Expand Up @@ -307,6 +307,73 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges(
})
}

// TestGetAMWithMembershipCheck tests GetAMWithMembershipCheck.
func (suite *keyspaceGroupManagerTestSuite) TestGetAMWithMembershipCheck() {
re := suite.Require()

mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1)
re.NotNil(mgr)
defer mgr.Close()

var (
am *AllocatorManager
err error
)

// Create keyspace group 0 which contains keyspace 0, 1, 2.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
uint32(0), []uint32{0, 1, 2})

err = mgr.Initialize(true)
re.NoError(err)

// Should be able to get AM for keyspace 0, 1, 2 in keyspace group 0.
am, err = mgr.GetAMWithMembershipCheck(0, 0)
re.NoError(err)
re.NotNil(am)
am, err = mgr.GetAMWithMembershipCheck(1, 0)
re.NoError(err)
re.NotNil(am)
am, err = mgr.GetAMWithMembershipCheck(2, 0)
re.NoError(err)
re.NotNil(am)
// Should fail because keyspace 3 is not in keyspace group 0.
am, err = mgr.GetAMWithMembershipCheck(3, 0)
re.Error(err)
re.Nil(am)
// Should fail because keyspace group 1 doesn't exist.
am, err = mgr.GetAMWithMembershipCheck(0, 1)
re.Error(err)
re.Nil(am)
}

// TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives
// a tso request with mismatched keyspace and keyspace group.
func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() {
re := suite.Require()

mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1)
re.NotNil(mgr)
defer mgr.Close()

// Create keyspace group 0 which contains keyspace 0, 1, 2.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
uint32(0), []uint32{0, 1, 2})

err := mgr.Initialize(true)
re.NoError(err)

// Should fail because keyspace 0 is not in keyspace group 1 and the API returns
// the keyspace group 0 to which the keyspace 0 belongs.
_, keyspaceGroupBelongTo, err := mgr.HandleTSORequest(0, 1, GlobalDCLocation, 1)
re.Error(err)
re.Equal(uint32(0), keyspaceGroupBelongTo)
}

type etcdEvent struct {
eventType mvccpb.Event_EventType
ksg *endpoint.KeyspaceGroup
Expand Down Expand Up @@ -374,7 +441,7 @@ func runTestLoadKeyspaceGroupsAssignment(
}
addKeyspaceGroupAssignment(
ctx, etcdClient, assignToMe,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j))
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j), []uint32{uint32(j)})
}
}(i)
}
Expand Down Expand Up @@ -446,7 +513,8 @@ func deleteKeyspaceGroupInEtcd(
// addKeyspaceGroupAssignment adds a keyspace group assignment to etcd.
func addKeyspaceGroupAssignment(
ctx context.Context, etcdClient *clientv3.Client,
assignToMe bool, rootPath, svcAddr string, id uint32,
assignToMe bool, rootPath, svcAddr string,
groupID uint32, keyspaces []uint32,
) error {
var location string
if assignToMe {
Expand All @@ -455,12 +523,12 @@ func addKeyspaceGroupAssignment(
location = uuid.NewString()
}
group := &endpoint.KeyspaceGroup{
ID: id,
ID: groupID,
Members: []endpoint.KeyspaceGroupMember{{Address: location}},
Keyspaces: []uint32{id},
Keyspaces: keyspaces,
}

key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/")
key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(groupID)}, "/")
value, err := json.Marshal(group)
if err != nil {
return err
Expand Down

0 comments on commit 72f07ae

Please sign in to comment.