From d3789680cc79591f62c2590e1c4e9379f2345d41 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 27 Apr 2023 17:40:33 -0700 Subject: [PATCH 1/7] Add more tests for multiple keyspace group. Add CheckMultiKeyspacesTSO() and WaitForMultiKeyspacesTSOAvailable in test utility. Add TestTSOKeyspaceGroupManager/TestKeyspacesServedByNonDefaultKeyspaceGroup. Cover TestGetTS, TestGetTSAsync, TestUpdateAfterResetTSO in TestMicroserviceTSOClient for multiple keyspace groups. Signed-off-by: Bin Shi --- tests/integrations/mcs/testutil.go | 76 +++++++- .../mcs/tso/keyspace_group_manager_test.go | 83 ++++++++- tests/integrations/mcs/tso/server_test.go | 2 +- tests/integrations/tso/client_test.go | 171 ++++++++++++------ tests/server/tso/common_test.go | 2 +- 5 files changed, 271 insertions(+), 63 deletions(-) diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index f9c47aa56f0..1f50a52c656 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -29,6 +29,7 @@ import ( tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" ) var once sync.Once @@ -48,13 +49,25 @@ func InitLogger(cfg *tso.Config) (err error) { return err } -// SetupClientWithKeyspace creates a TSO client for test. -func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { +// SetupClientWithDefaultKeyspaceName creates a TSO client with default keyspace name for test. +func SetupClientWithDefaultKeyspaceName( + ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption, +) pd.Client { cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } +// SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test. +func SetupClientWithKeyspaceID( + ctx context.Context, re *require.Assertions, + keyspaceID uint32, endpoints []string, opts ...pd.ClientOption, +) pd.Client { + cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...) + re.NoError(err) + return cli +} + // StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { cfg := rm.NewConfig() @@ -137,3 +150,62 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error { } return errors.WithStack(err) } + +// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces. +func CheckMultiKeyspacesTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + clients []pd.Client, duration time.Duration, +) { + wg.Add(len(clients)) + + for _, client := range clients { + go func(cli pd.Client) { + defer wg.Done() + var ts, lastTS uint64 + for { + select { + case <-ctx.Done(): + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) + return + default: + } + physical, logical, err := cli.GetTS(ctx) + // omit the error check since there are many kinds of errors + if err != nil { + continue + } + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + }(client) + } + + time.Sleep(duration) +} + +// WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side +func WaitForMultiKeyspacesTSOAvailable( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + keyspaceIDs []uint32, backendEndpoints []string, +) []pd.Client { + wg.Add(len(keyspaceIDs)) + + clients := make([]pd.Client, 0, len(keyspaceIDs)) + for _, keyspaceID := range keyspaceIDs { + cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints) + re.NotNil(cli) + clients = append(clients, cli) + + go func() { + defer wg.Done() + testutil.Eventually(re, func() bool { + _, _, err := cli.GetTS(ctx) + return err == nil + }) + }() + } + + return clients +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 3cc48e682b9..6746dc183cc 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -83,7 +83,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() { } func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) { - for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") { + keyspaceGroups := handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") + for _, group := range keyspaceGroups { // Do not delete default keyspace group. if group.ID == mcsutils.DefaultKeyspaceGroupID { continue @@ -130,6 +131,86 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp } } } + + ctx, cancel := context.WithCancel(suite.ctx) + wg := &sync.WaitGroup{} + keyspaceIDs := []uint32{0, 1, 2, 3, 1000} + clients := mcs.WaitForMultiKeyspacesTSOAvailable( + ctx, re, wg, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) + wg.Wait() + re.Equal(len(keyspaceIDs), len(clients)) + mcs.CheckMultiKeyspacesTSO(ctx, re, wg, clients, 3*time.Second) + cancel() + wg.Wait() +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroup() { + // Create multiple keyspace groups, and every keyspace should be served by one of them + // on a tso server. + re := suite.Require() + + params := []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + }{ + {0, []uint32{0, 10}}, + {1, []uint32{1, 11}}, + {2, []uint32{2, 12}}, + } + + for _, param := range params { + if param.keyspaceGroupID == 0 { + // we have already created default keyspace group, so we can skip it. + // keyspace 10 isn't assigned to any keyspace group, so they will be + // served by default keyspace group. + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: param.keyspaceGroupID, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: param.keyspaceIDs, + }, + }, + }) + } + + testutil.Eventually(re, func() bool { + for _, param := range params { + for _, keyspaceID := range param.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) { + tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) + re.NoError(err) + re.NotNil(tam) + served = true + } + } + if !served { + return false + } + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + keyspaceIDs := make([]uint32, 0) + for _, param := range params { + keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...) + } + + ctx, cancel := context.WithCancel(suite.ctx) + wg := &sync.WaitGroup{} + clients := mcs.WaitForMultiKeyspacesTSOAvailable( + ctx, re, wg, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) + wg.Wait() + re.Equal(len(keyspaceIDs), len(clients)) + mcs.CheckMultiKeyspacesTSO(ctx, re, wg, clients, 3*time.Second) + cancel() + wg.Wait() } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index da23ab1d1eb..cc9440045ea 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -186,7 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() - cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints}) + cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, []string{backendEndpoints}) physical, logical, err := cli.GetTS(ctx) re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 2d41aad4b84..6b849b4f273 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -28,10 +28,13 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" + handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" ) var r = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -44,12 +47,14 @@ type tsoClientTestSuite struct { cancel context.CancelFunc // The PD cluster. cluster *tests.TestCluster + // pdLeaderServer is the leader server of the PD cluster. + pdLeaderServer *tests.TestServer // The TSO service in microservice mode. tsoCluster *mcs.TestTSOCluster backendEndpoints string - - client pd.TSOClient + keyspaceIDs []uint32 + clients []pd.Client } func TestLegacyTSOClient(t *testing.T) { @@ -78,16 +83,58 @@ func (suite *tsoClientTestSuite) SetupSuite() { err = suite.cluster.RunInitialServers() re.NoError(err) leaderName := suite.cluster.WaitLeader() - pdLeader := suite.cluster.GetServer(leaderName) - re.NoError(pdLeader.BootstrapCluster()) - suite.backendEndpoints = pdLeader.GetAddr() + suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + re.NoError(suite.pdLeaderServer.BootstrapCluster()) + suite.backendEndpoints = suite.pdLeaderServer.GetAddr() + suite.keyspaceIDs = make([]uint32, 0) + if suite.legacy { - suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) + client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) re.NoError(err) + suite.keyspaceIDs = append(suite.keyspaceIDs, 0) + suite.clients = make([]pd.Client, 0) + suite.clients = append(suite.clients, client) } else { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) re.NoError(err) - suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) + + params := []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + }{ + {0, []uint32{0, 10}}, + {1, []uint32{1, 11}}, + {2, []uint32{2}}, + } + + for _, param := range params { + if param.keyspaceGroupID == 0 { + // we have already created default keyspace group, so we can skip it. + // keyspace 10 isn't assigned to any keyspace group, so they will be + // served by default keyspace group. + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: param.keyspaceGroupID, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: param.keyspaceIDs, + }, + }, + }) + } + + for _, param := range params { + suite.keyspaceIDs = append(suite.keyspaceIDs, param.keyspaceIDs...) + } + + wg := &sync.WaitGroup{} + suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, wg, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + wg.Wait() + re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } } @@ -101,19 +148,21 @@ func (suite *tsoClientTestSuite) TearDownSuite() { func (suite *tsoClientTestSuite) TestGetTS() { var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - var lastTS uint64 - for i := 0; i < tsoRequestRound; i++ { - physical, logical, err := suite.client.GetTS(suite.ctx) - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Less(lastTS, ts) - lastTS = ts - } - }() + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + var lastTS uint64 + for j := 0; j < tsoRequestRound; j++ { + physical, logical, err := client.GetTS(suite.ctx) + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Less(lastTS, ts) + lastTS = ts + } + }(client) + } } wg.Wait() } @@ -124,17 +173,20 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() { for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - tsFutures := make([]pd.TSFuture, tsoRequestRound) - for i := range tsFutures { - tsFutures[i] = suite.client.GetTSAsync(suite.ctx) - } - var lastTS uint64 = math.MaxUint64 - for i := len(tsFutures) - 1; i >= 0; i-- { - physical, logical, err := tsFutures[i].Wait() - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Greater(lastTS, ts) - lastTS = ts + + for _, client := range suite.clients { + tsFutures := make([]pd.TSFuture, tsoRequestRound) + for j := range tsFutures { + tsFutures[j] = client.GetTSAsync(suite.ctx) + } + var lastTS uint64 = math.MaxUint64 + for j := len(tsFutures) - 1; j >= 0; j-- { + physical, logical, err := tsFutures[j].Wait() + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Greater(lastTS, ts) + lastTS = ts + } } }() } @@ -147,33 +199,36 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - // Transfer leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) - oldLeaderName := suite.cluster.WaitLeader() - err := suite.cluster.GetServer(oldLeaderName).ResignLeader() - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) - newLeaderName := suite.cluster.WaitLeader() - re.NotEqual(oldLeaderName, newLeaderName) - // Request a new TSO. - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - // Transfer leader back. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) - err = suite.cluster.GetServer(newLeaderName).ResignLeader() - re.NoError(err) - // Should NOT panic here. - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) + for i := 0; i < len(suite.clients); i++ { + client := suite.clients[i] + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + // Transfer leader to trigger the TSO resetting. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + oldLeaderName := suite.cluster.WaitLeader() + err := suite.cluster.GetServer(oldLeaderName).ResignLeader() + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + newLeaderName := suite.cluster.WaitLeader() + re.NotEqual(oldLeaderName, newLeaderName) + // Request a new TSO. + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + // Transfer leader back. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) + err = suite.cluster.GetServer(newLeaderName).ResignLeader() + re.NoError(err) + // Should NOT panic here. + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) + } } func (suite *tsoClientTestSuite) TestRandomResignLeader() { @@ -286,7 +341,7 @@ func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, b for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, strings.Split(backendEndpoints, ",")) var ts, lastTS uint64 for { select { diff --git a/tests/server/tso/common_test.go b/tests/server/tso/common_test.go index f528103db84..877fcb10982 100644 --- a/tests/server/tso/common_test.go +++ b/tests/server/tso/common_test.go @@ -28,7 +28,7 @@ import ( ) const ( - tsoRequestConcurrencyNumber = 5 + tsoRequestConcurrencyNumber = 3 tsoRequestRound = 30 tsoCount = 10 ) From 24330975d93f115d732c482b0628933e71bbaa47 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 27 Apr 2023 23:33:14 -0700 Subject: [PATCH 2/7] refine and handle feedback Signed-off-by: Bin Shi --- tests/integrations/mcs/testutil.go | 10 ++++++++-- .../mcs/tso/keyspace_group_manager_test.go | 18 ++++-------------- tests/integrations/tso/client_test.go | 4 +--- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index 1f50a52c656..7151c7d3416 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -153,9 +153,11 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error { // CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces. func CheckMultiKeyspacesTSO( - ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + ctx context.Context, re *require.Assertions, clients []pd.Client, duration time.Duration, ) { + ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} wg.Add(len(clients)) for _, client := range clients { @@ -183,13 +185,16 @@ func CheckMultiKeyspacesTSO( } time.Sleep(duration) + cancel() + wg.Wait() } // WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side func WaitForMultiKeyspacesTSOAvailable( - ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + ctx context.Context, re *require.Assertions, keyspaceIDs []uint32, backendEndpoints []string, ) []pd.Client { + wg := sync.WaitGroup{} wg.Add(len(keyspaceIDs)) clients := make([]pd.Client, 0, len(keyspaceIDs)) @@ -207,5 +212,6 @@ func WaitForMultiKeyspacesTSOAvailable( }() } + wg.Wait() return clients } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 6746dc183cc..f2ca2e37d32 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -132,16 +132,11 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp } } - ctx, cancel := context.WithCancel(suite.ctx) - wg := &sync.WaitGroup{} keyspaceIDs := []uint32{0, 1, 2, 3, 1000} clients := mcs.WaitForMultiKeyspacesTSOAvailable( - ctx, re, wg, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) - wg.Wait() + suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) re.Equal(len(keyspaceIDs), len(clients)) - mcs.CheckMultiKeyspacesTSO(ctx, re, wg, clients, 3*time.Second) - cancel() - wg.Wait() + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroup() { @@ -202,15 +197,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...) } - ctx, cancel := context.WithCancel(suite.ctx) - wg := &sync.WaitGroup{} clients := mcs.WaitForMultiKeyspacesTSOAvailable( - ctx, re, wg, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) - wg.Wait() + suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) re.Equal(len(keyspaceIDs), len(clients)) - mcs.CheckMultiKeyspacesTSO(ctx, re, wg, clients, 3*time.Second) - cancel() - wg.Wait() + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 6b849b4f273..06b754e8ed4 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -130,10 +130,8 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = append(suite.keyspaceIDs, param.keyspaceIDs...) } - wg := &sync.WaitGroup{} suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, wg, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) - wg.Wait() + suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } } From 2db0778b048d9a7ef6bcdfe8d78cb4e61541c354 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Fri, 28 Apr 2023 09:56:46 -0700 Subject: [PATCH 3/7] Cover TestRandomResignLeader for multiple keyspace group Signed-off-by: Bin Shi --- tests/integrations/tso/client_test.go | 31 +++++++++++++++++---------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 06b754e8ed4..85fd314c75c 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -234,29 +235,37 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - ctx, cancel := context.WithCancel(suite.ctx) - var wg sync.WaitGroup - checkTSO(ctx, re, &wg, suite.backendEndpoints) - wg.Add(1) - go func() { - defer wg.Done() + parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. // currently, the time to discover tso service is usually a little longer than 1s, compared // to the previous time taken < 1s. n := r.Intn(2) + 3 time.Sleep(time.Duration(n) * time.Second) if !suite.legacy { - suite.tsoCluster.ResignPrimary() - suite.tsoCluster.WaitForDefaultPrimaryServing(re) + wg := sync.WaitGroup{} + // Select the default keyspace and a randomly picked keyspace to test + keyspaceIDs := []uint32{mcsutils.DefaultKeyspaceID} + selectIdx := uint32(r.Intn(len(suite.keyspaceIDs)-1)+1) + keyspaceIDs = append(keyspaceIDs, suite.keyspaceIDs[selectIdx]) + wg.Add(len(keyspaceIDs)) + for _, keyspaceID := range keyspaceIDs { + go func(keyspaceID uint32) { + defer wg.Done() + err := suite.tsoCluster.ResignPrimary(keyspaceID, 0) + re.NoError(err) + suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, 0) + }(keyspaceID) + } + wg.Wait() } else { err := suite.cluster.ResignLeader() re.NoError(err) suite.cluster.WaitLeader() } time.Sleep(time.Duration(n) * time.Second) - cancel() - }() - wg.Wait() + } + + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct) } func (suite *tsoClientTestSuite) TestRandomShutdown() { From 098029be00ddf1dd4648ef2f5d2e4d03d51c80c5 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Fri, 28 Apr 2023 11:13:27 -0700 Subject: [PATCH 4/7] Cover TestRandomShutdown for multiple keyspace groups Signed-off-by: Bin Shi --- tests/integrations/tso/client_test.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 85fd314c75c..f6c535f1d2f 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -245,7 +245,7 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { wg := sync.WaitGroup{} // Select the default keyspace and a randomly picked keyspace to test keyspaceIDs := []uint32{mcsutils.DefaultKeyspaceID} - selectIdx := uint32(r.Intn(len(suite.keyspaceIDs)-1)+1) + selectIdx := uint32(r.Intn(len(suite.keyspaceIDs)-1) + 1) keyspaceIDs = append(keyspaceIDs, suite.keyspaceIDs[selectIdx]) wg.Add(len(keyspaceIDs)) for _, keyspaceID := range keyspaceIDs { @@ -273,12 +273,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - ctx, cancel := context.WithCancel(suite.ctx) - var wg sync.WaitGroup - checkTSO(ctx, re, &wg, suite.backendEndpoints) - wg.Add(1) - go func() { - defer wg.Done() + parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. // currently, the time to discover tso service is usually a little longer than 1s, compared // to the previous time taken < 1s. @@ -290,9 +285,9 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close() } time.Sleep(time.Duration(n) * time.Second) - cancel() - }() - wg.Wait() + } + + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct) suite.TearDownSuite() suite.SetupSuite() } From cd4a076933112761d08f48b78ac1111ddfe04f9e Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 4 May 2023 09:32:39 +0700 Subject: [PATCH 5/7] Fix test failure Signed-off-by: Bin Shi --- pkg/mcs/tso/server/server.go | 7 +++---- pkg/tso/keyspace_group_manager.go | 2 +- tests/integrations/mcs/cluster.go | 13 ++++++++---- tests/integrations/mcs/testutil.go | 11 +++++++--- .../mcs/tso/keyspace_group_manager_test.go | 10 ++++++--- tests/integrations/mcs/tso/server_test.go | 21 ++++++++++--------- tests/integrations/tso/client_test.go | 2 +- 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index d9866b7a9db..25a832c5f48 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb return member, nil } -// ResignPrimary resigns the primary of the given keyspace and keyspace group. -func (s *Server) ResignPrimary() error { - member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) +// ResignPrimary resigns the primary of the given keyspace. +func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID) if err != nil { return err } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c9e8dac4ca..0089e0d9bdc 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID( return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil } -// GetElectionMember returns the election member of the given keyspace group +// GetElectionMember returns the election member of the keyspace group serving the given keyspace. func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index 228f506454d..dbc9964b62b 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -16,6 +16,7 @@ package mcs import ( "context" + "fmt" "time" "github.com/stretchr/testify/require" @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) { } // ResignPrimary resigns the primary TSO server. -func (tc *TestTSOCluster) ResignPrimary() { - tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary() +func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID) + if primaryServer == nil { + return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID) + } + return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID) } -// GetPrimary returns the primary TSO server. -func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { +// GetPrimaryServer returns the primary TSO server of the given keyspace +func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server { for _, server := range tc.servers { if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { return server diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index 7151c7d3416..3ca1ad39436 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -154,7 +154,7 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error { // CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces. func CheckMultiKeyspacesTSO( ctx context.Context, re *require.Assertions, - clients []pd.Client, duration time.Duration, + clients []pd.Client, parallelAct func(), ) { ctx, cancel := context.WithCancel(ctx) wg := sync.WaitGroup{} @@ -184,8 +184,13 @@ func CheckMultiKeyspacesTSO( }(client) } - time.Sleep(duration) - cancel() + wg.Add(1) + go func() { + defer wg.Done() + parallelAct() + cancel() + }() + wg.Wait() } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index f2ca2e37d32..db032faa251 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -136,7 +136,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp clients := mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) re.Equal(len(keyspaceIDs), len(clients)) - mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroup() { @@ -200,7 +202,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe clients := mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) re.Equal(len(keyspaceIDs), len(clients)) - mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { @@ -231,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { }) ts.Physical += time.Hour.Milliseconds() // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) re.NoError(err) // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index cc9440045ea..cb43f5a3b9a 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() { type CommonTestSuite struct { suite.Suite - ctx context.Context - cancel context.CancelFunc - cluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster - pdLeader *tests.TestServer - tsoPrimary *tso.Server - backendEndpoints string + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster + pdLeader *tests.TestServer + // tsoDefaultPrimaryServer is the primary server of the default keyspace group + tsoDefaultPrimaryServer *tso.Server + backendEndpoints string } func TestCommonTestSuite(t *testing.T) { @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) suite.NoError(err) suite.tsoCluster.WaitForDefaultPrimaryServing(re) - suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) } func (suite *CommonTestSuite) TearDownSuite() { @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() { func (suite *CommonTestSuite) TestAdvertiseAddr() { re := suite.Require() - conf := suite.tsoPrimary.GetConfig() + conf := suite.tsoDefaultPrimaryServer.GetConfig() re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr()) } func (suite *CommonTestSuite) TestMetrics() { re := suite.Require() - resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics") + resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics") re.NoError(err) defer resp.Body.Close() re.Equal(http.StatusOK, resp.StatusCode) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index f6c535f1d2f..6607807c65b 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -251,7 +251,7 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { for _, keyspaceID := range keyspaceIDs { go func(keyspaceID uint32) { defer wg.Done() - err := suite.tsoCluster.ResignPrimary(keyspaceID, 0) + err := suite.tsoCluster.ResignPrimary(keyspaceID, mcsutils.DefaultKeyspaceGroupID) re.NoError(err) suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, 0) }(keyspaceID) From 0758f20456044db926e94608698496a932c26f23 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 4 May 2023 09:58:40 +0700 Subject: [PATCH 6/7] fix go fmt error Signed-off-by: Bin Shi --- tests/integrations/mcs/tso/server_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index cb43f5a3b9a..d074c49a497 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -349,11 +349,11 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() { type CommonTestSuite struct { suite.Suite - ctx context.Context - cancel context.CancelFunc - cluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster - pdLeader *tests.TestServer + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster + pdLeader *tests.TestServer // tsoDefaultPrimaryServer is the primary server of the default keyspace group tsoDefaultPrimaryServer *tso.Server backendEndpoints string From 4cfc99e6eea4fa8ef9349f1726a29e3241c8cdef Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 4 May 2023 10:25:03 +0700 Subject: [PATCH 7/7] refine test Signed-off-by: Bin Shi --- .../mcs/tso/keyspace_group_manager_test.go | 2 +- tests/integrations/tso/client_test.go | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index db032faa251..60cc8c1161d 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -141,7 +141,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp }) } -func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroup() { +func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() { // Create multiple keyspace groups, and every keyspace should be served by one of them // on a tso server. re := suite.Require() diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 6607807c65b..80518798179 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -168,12 +168,11 @@ func (suite *tsoClientTestSuite) TestGetTS() { func (suite *tsoClientTestSuite) TestGetTSAsync() { var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - - for _, client := range suite.clients { + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() tsFutures := make([]pd.TSFuture, tsoRequestRound) for j := range tsFutures { tsFutures[j] = client.GetTSAsync(suite.ctx) @@ -186,8 +185,8 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() { suite.Greater(lastTS, ts) lastTS = ts } - } - }() + }(client) + } } wg.Wait() }