diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 4c180736a8fd..89fcbc6af7bb 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -390,7 +390,9 @@ func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMe } return nil }) - meta.Id = spaceID + if meta != nil { + meta.Id = spaceID + } return meta, err } @@ -671,8 +673,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { ) }() for moreToPatrol { + var defaultKeyspaceGroup *endpoint.KeyspaceGroup err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { - defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID) + var err error + defaultKeyspaceGroup, err = manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID) if err != nil { return err } @@ -752,6 +756,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if err != nil { return err } + manager.kgm.groups[endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)].Put(defaultKeyspaceGroup) // If all keyspaces in the current batch are assigned, update the next start ID. manager.nextPatrolStartID = nextStartID } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 46810be92d51..3362b46f147f 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/balancer" "github.com/tikv/pd/pkg/mcs/discovery" @@ -149,6 +150,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { defer logutil.LogPanic() defer m.wg.Done() ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval) + failpoint.Inject("acceleratedAllocNodes", func() { + ticker.Stop() + ticker = time.NewTicker(time.Millisecond * 100) + }) defer ticker.Stop() for { select { @@ -626,9 +631,12 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount defer cancel() ticker := time.NewTicker(allocNodesInterval) defer ticker.Stop() + + var kg *endpoint.KeyspaceGroup nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount) err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { - kg, err := m.store.LoadKeyspaceGroup(txn, id) + var err error + kg, err = m.store.LoadKeyspaceGroup(txn, id) if err != nil { return err } @@ -672,6 +680,7 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount if err != nil { return nil, err } + m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg) log.Info("alloc nodes for keyspace group", zap.Uint32("id", id), zap.Reflect("nodes", nodes)) return nodes, nil } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 4b650bf1e259..e5317d353648 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -23,16 +23,17 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" - "github.com/tikv/pd/client/testutil" "github.com/tikv/pd/pkg/election" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" @@ -425,3 +426,33 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() cancel() wg.Wait() } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + re.Equal(uint32(0), kg.ID) + re.Equal([]uint32{0}, kg.Keyspaces) + re.False(kg.IsSplitting()) + re.Equal(0, len(kg.Members)) + // wait for finishing alloc nodes + time.Sleep(500 * time.Millisecond) + kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + re.Equal(2, len(kg.Members)) + testConfig := map[string]string{ + "config": "1", + "tso_keyspace_group_id": "0", + "user_kind": "basic", + } + handlersutil.MustCreateKeyspace(re, suite.pdLeaderServer, &handlers.CreateKeyspaceParams{ + Name: "test_keyspace", + Config: testConfig, + }) + kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + // wait for finishing alloc nodes + time.Sleep(500 * time.Millisecond) + re.Equal(2, len(kg.Members)) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) +} diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index ad456bd5be64..7fd8de013f7e 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -152,7 +152,7 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, cou Name: fmt.Sprintf("test_keyspace_%d", i), Config: testConfig, } - resultMeta[i] = mustCreateKeyspace(re, server, createRequest) + resultMeta[i] = MustCreateKeyspace(re, server, createRequest) } return resultMeta } diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index 3ab41bfb0cca..b638f1bbba4e 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -79,7 +79,8 @@ func sendUpdateStateRequest(re *require.Assertions, server *tests.TestServer, na return true, meta.KeyspaceMeta } -func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta { +// MustCreateKeyspace creates a keyspace with HTTP API. +func MustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta { data, err := json.Marshal(request) re.NoError(err) httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspacesPrefix, bytes.NewBuffer(data))